# Sanity Checks Raw Mobile Positioning Data (MPD)

## Summary
This notebook focuses on performing sanity checks on mobile phone data using PySpark RDD. The checks are crucial for ensuring data quality and consist of verifying missing columns, character checks, data types, timestamp validity, and detecting forbidden values. The checks are executed using a line-by-line inspection approach employing the `check_line()` function.

Please specify the `RAW_FILE_PATH` if using merged dataset (7 fields, combination of subs data and cells data) or set the correct path for both `RAW_SUBS_PATH` and `RAW_CELLS_PATH` if the data are still unmerged (4 fields for each)

## Data Structure

To work with Mobile Positioning Data (MPD), the minimum required fields are listed below:

### Option 1: Records/Events Data Already Merged with Cells Location Data

| Field Name   | Type      | Mode     | Description                                          |
|--------------|-----------|----------|------------------------------------------------------|
| `msisdn`     | String    |          | Hashed subscribers identifier                        |
| `datetime`   | Timestamp |          | Transaction date (date and hour)                     |
| `cell_id`    | String    | NULLABLE | Hashed cell identifier                               |
| `latitude`   | Float     |          | Latitude of Base Transceiver Station (BTS)           |
| `longitude`  | Float     |          | Longitude of Base Transceiver Station (BTS)          |
| `data_type`  | String    |          | Data source, can be CDR/CHG or IPDR/UPCC             |
| `service`    | String    |          | Transaction service (4G/ 3G/ 2G)                     |

### Option 2: Records Data Not Merged with Cells Location Data

#### Subss Records data

| Field Name   | Type      | Mode     | Description                                          |
|--------------|-----------|----------|------------------------------------------------------|
| `msisdn`     | String    |          | Hashed subscribers identifier                        |
| `datetime`   | Timestamp |          | Transaction date (date and hour)                     |
| `cell_id`    | String    |          | Hashed cell identifier                               |
| `data_type`  | String    |          | Data source, can be CDR/CHG or IPDR/UPCC             |

#### Cells Data

| Field Name   | Type      | Mode     | Description                                          |
|--------------|-----------|----------|------------------------------------------------------|
| `cell_id`    | String    |          | Hashed cell identifier                               |
| `latitude`   | Float     |          | Latitude of Base Transceiver Station (BTS)           |
| `longitude`  | Float     |          | Longitude of Base Transceiver Station (BTS)          |
| `service`    | String    |          | Transaction service (4G/ 3G/ 2G)                     |

Please ensure to follow the same structure or adjust the script as needed and the file path already set up correctly. If your input data is still separated into several files, you need to combine them first using the `combine_csv_files` function

## Requirements

- PySpark installed on the local machine / PySpark Cluster with HDFS 

- Required packages and dependencies installed (pyspark, pandas, geopandas, folium, tqdm)

- Raw mobile phone data (MPD) in CSV / Parquet Format.


**To run this notebook, you must install spark locally or having access to spark cluster, and install all packages dependency.** 

Please take a look into the [Environment Notebook: Mac & Linux Users](https://github.com/mandes95/893SSA-2022-BDT-DKH/blob/main/notebook/00-Setup%20Environment%20%5Bmac%20%26%20linux%5D.ipynb) or [Environment Notebook: Windows Users](https://github.com/mandes95/893SSA-2022-BDT-DKH/blob/main/notebook/00-Setup%20Environment%20%5Bwindows%5D.ipynb)


In [None]:
import glob
import pandas as pd

# Helper function to combine  all CSV files with the same format in a folder into one combined CSV file.

def combine_csv_files(folder_path, combined_file_name):
    """
    Combines all CSV files in a folder into one combined CSV file.
    
    Parameters:
    - folder_path (str): Path to the folder containing CSV files.
    - combined_file_name (str): Name of the combined CSV file to be created.
    
    Returns:
    - None
    """
    # Ensure folder path ends with '/'
    if not folder_path.endswith('/'):
        folder_path += '/'
    
    # Get a list of all CSV files in the folder
    csv_files = glob.glob(folder_path + '*.csv')
    
    # Initialize an empty list to hold all data frames
    df_list = []
    
    # Iterate over the list of CSV files
    for csv_file in csv_files:
        # Read each CSV file into a pandas DataFrame
        df = pd.read_csv(csv_file)
        # Append the DataFrame to the list
        df_list.append(df)
    
    # Concatenate all DataFrames in the list along the rows axis
    combined_df = pd.concat(df_list, axis=0, ignore_index=True)
    
    # Write the combined DataFrame to a single CSV file
    combined_df.to_csv(folder_path + combined_file_name, index=False)
    
    print(f'Combined CSV file successfully saved as {combined_file_name} in {folder_path}')

# Example usage:
# Assuming you have multiple CSV files in a folder named 'data_files'
# and you want to combine them into a single file named 'combined_data.csv'

# folder_path = '../data/00_Input/'
# combined_file_name = '../combined_data.csv'

# combine_csv_files(folder_path, combined_file_name)


In [None]:
# Import the sys module to adjust the Python system path
import sys

# Append a path to the system path so that Python can import modules from the specified directory (relative path)
sys.path.append('../')

# Import the CONF, QA_summary and QA_action_plan variables from the script.QA module
from script.conf import *

# (Temporary) Replace the configuration here or in script/conf.py if needed 
# option 1
RAW_FILE_PATH = "mpd_synthetic_data_ken_100subs_sanity.csv"  # File path for the raw data file for checking
#RAW_FILE_PATH = "combined_data.csv"  # File path for the raw data file

# option 2
#RAW_SUBS_PATH = "mpd_synthetic_data_20240604_v3_adjust_dirty_subs.csv"  # File path for the raw subs data file
#RAW_CELLS_PATH = "mpd_synthetic_data_20240604_v3_adjust_dirty_cells.csv"  # File path for the raw cells data file

In [None]:
# Import the os module to interact with the operating system
import os

try:
    # Check if the merged file exists
    flag = os.path.exists(BASE_PATH+RAW_FILE_PATH)
    if flag:
        USE_MERGED = True
        SANITY_FILE_PATH = RAW_FILE_PATH.replace(".csv","_sanity.jsonl")

    else:
        USE_MERGED = False
        SANITY_FILE_PATH = [RAW_SUBS_PATH.replace(".csv","_sanity.jsonl"),RAW_CELLS_PATH.replace(".csv","_sanity.jsonl")]
        print("Merge file not found. Will proceed the data using option unmerge. Please update configuration of RAW_CELL_PATH and RAW_SUBS_PATH")

except:
    USE_MERGED = False
    SANITY_FILE_PATH = [RAW_SUBS_PATH.replace(".csv","_sanity.jsonl"),RAW_CELLS_PATH.replace(".csv","_sanity.jsonl")]
    print("Merge file not found. Will proceed the data using option unmerge. Please update configuration of RAW_CELL_PATH and RAW_SUBS_PATH")

# Check if merged is set to False
if USE_MERGED == False:
    try:
        # Check if both RAW_SUBS_PATH and RAW_CELLS_PATH exist
        flag = os.path.exists(BASE_PATH+RAW_SUBS_PATH) and os.path.exists(BASE_PATH+RAW_CELLS_PATH)
        if flag==False:
            USE_MERGED = None
            SANITY_FILE_PATH = None
            print("File(s) not found. Please check your input path again")
    except FileExistsError:
        USE_MERGED = None
        SANITY_FILE_PATH = None
        print("File(s) not found. Please check your input path again")

print("Use Merge: {}".format(USE_MERGED))
print(f"Target output path: {SANITY_FILE_PATH}")

In [None]:
# Import the os module to interact with the operating system
import os

# Try to create a new directory at the path QA_PATH using os.mkdir()
# If the directory already exists, print a message stating that it does
try:
    os.mkdir(SANITY_PATH)
    print("Create new folder {}".format(SANITY_PATH))
except FileExistsError:
    print("Folder {} already exists".format(SANITY_PATH))

# Start PySpark

In [None]:
# Import the PySpark module
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType

import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, substring, col, to_date
from pyspark.sql.window import Window

# Import the SparkSession from PySpark
from pyspark.sql import SparkSession
CORE = 1
# if got error due to port configuration, run this script on terminal
# sudo hostname -s 127.0.0.1
# Create a SparkSession with the specified configuration local
spark = SparkSession.builder\
        .master("local[{}]".format(CORE))\
        .appName("01.ITU.PySpark-Sanity-Checks")\
        .config('spark.sql.execution.arrow.pyspark.enabled', 'true')\
        .config('spark.eventLog.gcMetrics.youngGenerationGarbageCollectors', 'true')\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Print the spark object which contains the SparkSession
spark

In [None]:
# Use this code below to connect your remote spark cluster instead.

# # Import the PySpark module
# import pyspark
# from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType

# import pyspark.sql.functions as f
# from pyspark.sql.functions import year, month, dayofmonth, substring, col, to_date
# from pyspark.sql.window import Window

# # Import the SparkSession from PySpark
# from pyspark.sql import SparkSession

# CLUSTER_URL = "spark://master"
# PORT = "7077"

# # Create a SparkSession with the specified configuration remote server
# spark = SparkSession.builder\
#         .master("{}:{}".format(CLUSTER_URL,PORT))\
#         .appName("01.ITU.PySpark-Raw")\
#         .getOrCreate()

# # Print the spark object which contains the SparkSession
# spark

# Define Checker Function

In [None]:
def visible_chars(value):
    # Define a regex pattern:
    # - [^\w\s,.;:'"@#$*()+=\-/] for characters that are not alphanumeric, common punctuations, or certain symbols
    # - [\?!] specifically include question mark and exclamation mark as invalid characters
    # - \x00-\x1F,\x7F-\xFF includes control and non-standard ASCII characters (extended ASCII)
    pattern = r"[^\w\s,.;:'\"@#$*()+=\-/]+|[\?!]|\x00-\x1F|\x7F-\xFF"

    # Search for the regex pattern in the given value
    return re.search(pattern,value)

def is_valid_timestamp(timestamp):
    try:
        if isinstance(timestamp, datetime):
            return True

        # Collapse extra spaces and coerce to str
        val = " ".join(str(timestamp).split())

        # Try your configured format first (if present), then a set of common variants
        formats = []
        try:
            formats.append(datetime_format)  # your existing format
        except NameError:
            pass

        # Common US-style variants:
        # - 12h w/ seconds, AM/PM
        # - 12h w/o seconds, AM/PM
        # - 24h w/ seconds
        # - 24h w/o seconds   <-- this covers '9/22/2025 11:34'
        formats.extend([
            "%m/%d/%Y %I:%M:%S %p",
            "%m/%d/%Y %I:%M %p",
            "%m/%d/%Y %H:%M:%S",
            "%m/%d/%Y %H:%M",
        ])

        for fmt in formats:
            try:
                datetime.strptime(val, fmt)
                return True
            except (ValueError, TypeError):
                continue

        return False
    except Exception:
        return False

def is_valid_datatype(value, data_type):
    try:
        if data_type == datetime:
            # Already a datetime? it's valid.
            if isinstance(value, datetime):
                return True

            # Normalize whitespace and coerce to string
            val = " ".join(str(value).split())

            # Try your existing format first (if defined), then common variants
            datetime_formats = []
            try:
                datetime_formats.append(datetime_format)  # your configured format
            except NameError:
                pass

            datetime_formats.extend([
                "%m/%d/%Y %I:%M:%S %p",  # e.g., 9/22/2025 6:05:00 AM
                "%m/%d/%Y %I:%M %p",     # e.g., 9/22/2025 6:05 AM
                "%m/%d/%Y %H:%M:%S",     # e.g., 9/22/2025 11:34:10
                "%m/%d/%Y %H:%M",        # e.g., 9/22/2025 11:34
            ])

            for fmt in datetime_formats:
                try:
                    datetime.strptime(val, fmt)
                    return True
                except (ValueError, TypeError):
                    continue
            return False
        else:
            data_type(value)
            return True
    except (ValueError, TypeError):
        return False

def listing_issues(case,value,verbose=True):
    # It takes three parameters: case, value, and verbose (which is set to True by default)
    # Check if verbose is True
    if verbose:
        # If verbose is True, return a dictionary with keys 'case' and 'value' and their corresponding values
        return {'case':case,'value':value}
    else:
        # If verbose is False, return a dictionary with keys 'case' and 'value' and their corresponding values
        return {'case':case}

def check_issues(issues,size):
    # Iterate over the first 10 items in detected_issues list for showcase
    for issue in issues[:size]:
        # Check if the 'case_type' key exists in the issue dictionary and its value is truthy
        if issue['case_type']:
            # Print the row number, issues detected, and the associated case type
            print(f"Row {issue['row']}: Issues detected -> {issue['case_type']}")

def write_issues(issues,output_path):
    parent_dir = os.path.dirname(output_path)
    os.makedirs(parent_dir, exist_ok=True)
    # It takes two parameters: issues (a list of dictionaries) and output_path (the path to the output file)
    # Open the output file in write mode, using a context manager to ensure proper file handling
    with open(output_path, 'w') as f:
        # Iterate over each issue in the issues list
        for issue in tqdm(issues):
            # Check if the 'case_type' key of the issue dictionary has a length greater than 0
            if len(issue['case_type'])>0:
                # If the condition is satisfied, write the issue dictionary to the file as JSON
                json.dump(issue, f)
                # Write a new line character to separate each issue
                f.write('\n')
                
                
def check_line(line_index, line, field_data_types, skip_header=True, verbose=False):
    # It takes several parameters: line_index (the index of the current line), line (the contents of the line), 
    # expected_num_fields (the expected number of fields in each line), field_data_types (a dictionary specifying the data types for each field),
    # skip_header (a boolean flag indicating whether to skip the header line, default is True), and verbose (a boolean flag indicating whether to print detailed information, default is False)
    # Create an empty list to store any issues encountered while checking the line
    issues = list()
    expected_num_fields = len(field_data_types)

    # Check if the header line should be skipped and if the current line is the first line
    if skip_header & (line_index == 0):
        # Return a dictionary containing the row index and the issues list
        return {'row': line_index, 'case_type': issues}
    
    # Case 1: Check for missing columns
    if len(line) != expected_num_fields:
        # Create a new issue using the listing_issues() function, passing the appropriate arguments
        # The case argument is set to 'Case 1 - Missing Column', and the value argument is set to the current line
        # The verbose argument is passed to the listing_issues() function as well
        issues.append(listing_issues(case='Case 1 - Missing Column', value=line, verbose=verbose))

    # Cases 2, 3, 4, 5, 6 : Check characters, data types, timestamp, forbidden value, missing value
    for i, value in enumerate(line): # i =  col index, value = col value
        
        if i >= expected_num_fields:  # Skip if index exceeds the predefined types
            continue
        
        field_name, expected_type, example_value = field_data_types[i]
        
        # Detect hidden or inappropriate characters (non-printable or control characters)
        if visible_chars(value):
            issues.append(listing_issues(case=f'Case 2 - Hidden Characters: {field_name}',value=value,verbose=verbose))

        # Check data types consistency
        if not is_valid_datatype(value, expected_type):
            issues.append(listing_issues(case=f'Case 3 - Inconsistent Data Type: {field_name}',value=value,verbose=verbose))
        
        # Check for impossible timestamp
        if expected_type == datetime and not is_valid_timestamp(value):
            issues.append(listing_issues(case='Case 4 - Impossible Timestamp',value=value,verbose=verbose))

        # Check invalid value
        if example_value is not None:
            try:
                _value = expected_type(value)
                if (expected_type in [str]) and (_value not in example_value):
                    issues.append(listing_issues(case=f'Case 5 - Invalid Value: {field_name}',value=value,verbose=verbose))
                
                if (expected_type in [int,float]) and ((_value < example_value[0]) or (_value > example_value[1])):
                    issues.append(listing_issues(case=f'Case 5 - Invalid Value: {field_name}',value=value,verbose=verbose))
            except:
                "invalid"

        # Check if missing value
        if value == "":
            issues.append(listing_issues(case=f'Case 6 - Missing Value: {field_name}',value=value,verbose=verbose))

    return {'row': line_index, 'case_type': issues}

def detect_data_issues(lines_with_index, field_data_types, skip_header=True,verbose=False):
    # The function detect_data_issues takes multiple parameters:
    # - lines_with_index: A collection of lines with their respective indices
    # - expected_num_fields: The number of fields expected in each line
    # - field_data_types: A list specifying the data type of each field
    # - skip_header: A boolean indicating whether to skip the header line (default is True)
    # - verbose: A boolean indicating whether to print additional information (default is False)

    # The function uses the map method on 'lines_with_index' and applies a lambda function to each line.
    # The lambda function calls another function 'check_line' and passes it the necessary arguments.
    # Finally, the collect method is used to retrieve the result as a list.
    return lines_with_index.map(lambda line: check_line(line[0], line[1], field_data_types,skip_header,verbose)).collect()

# Read Raw MPD Line by Line

In [None]:
# library
import re  # Importing the regular expression module
from tqdm import tqdm
from datetime import datetime  # Importing the datetime module from the datetime library
import json  # Importing the JSON module
from statistics import mode

# configuration
delimiter = ","  # The delimiter character used to split lines
datetime_format = "%Y-%m-%d %H:%M:%S"  # The format of the datetime value
#datetime_format = "%Y-%m-%d %H:%M"  # The format of the datetime value

# opt 1 fields
field_data_types = [
    ('msisdn',str, None),         # Field name, data type, and additional constraints
    ('datetime',datetime, None),  # Field name, data type, and additional constraints
    ('cell_id',str, None),        # Field name, data type, and additional constraints
    ('latitude',float, [-90.0,90.0]),  # Field name, data type, and additional constraints (latitude)
    ('longitude',float, [-180.0,180.0]),  # Field name, data type, and additional constraints (longitude)
    ('data_type',str, ["CDR","IPDR"]),   # Field name, data type, and allowed values for data_type
    ('service',str, ["2G","3G","4G","5G"])  # Field name, data type, and allowed values for service
]

# opt 2 fields
field_data_types_subs = [field_data_types[i] for i in [0,1,2,5]]  # Subset of field_data_types for subs dataset
field_data_types_cells = [field_data_types[i] for i in [2,3,4,6]]  # Subset of field_data_types for cells dataset

if USE_MERGED==True: #opt 1
    # Load a text file from the given path using Spark's context
    # Note: The following variables need to be defined in order for this line to work:
    #   - spark: Spark context object
    #   - BASE_PATH: Base path of the file
    #   - RAW_FILE_PATH: Path of the raw file
    print("Start Checking MPD file")
    lines = spark.sparkContext.textFile(BASE_PATH + RAW_FILE_PATH)
    field_len = lines.map(lambda line: len(line.split(delimiter))).collect()
    field_len = mode(field_len)
    print(f"fields len: {field_len}")

    # Zip each line with its corresponding index, then split each line by the specified delimiter
    indexed_lines = lines.zipWithIndex().map(lambda line: (line[1], line[0].split(delimiter)))
    detected_issues = detect_data_issues(indexed_lines, field_data_types, verbose=True)
    check_issues(detected_issues,10)
    write_issues(detected_issues, SANITY_PATH+"/"+SANITY_FILE_PATH)
    
elif USE_MERGED==False:
    # check subs file
    print("Start Checking Subs file")
    lines = spark.sparkContext.textFile(BASE_PATH + RAW_SUBS_PATH)
    field_len = lines.map(lambda line: len(line.split(delimiter))).collect()
    field_len = mode(field_len)
    print(f"fields len: {field_len}")

    # Zip each line with its corresponding index, then split each line by the specified delimiter
    indexed_lines = lines.zipWithIndex().map(lambda line: (line[1], line[0].split(delimiter)))
    detected_issues = detect_data_issues(indexed_lines, field_data_types_subs, verbose=True)
    check_issues(detected_issues,10)
    write_issues(detected_issues, SANITY_PATH+"/"+SANITY_FILE_PATH[0])
    print("\n")

    # check cell file
    print("Start Checking Cells file")
    lines = spark.sparkContext.textFile(BASE_PATH + RAW_CELLS_PATH)
    field_len = lines.map(lambda line: len(line.split(delimiter))).collect()
    field_len = mode(field_len)
    print(f"fields len: {field_len}")

    # Zip each line with its corresponding index, then split each line by the specified delimiter
    indexed_lines = lines.zipWithIndex().map(lambda line: (line[1], line[0].split(delimiter)))
    detected_issues = detect_data_issues(indexed_lines, field_data_types_cells, verbose=True)
    check_issues(detected_issues,10)
    write_issues(detected_issues, SANITY_PATH+"/"+SANITY_FILE_PATH[1])

In [None]:
spark.stop()