<a href="https://colab.research.google.com/github/RayRayKing/unacast_visit/blob/main/unacast_task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instructions

1. Upload the daily_data folder to the content folder
2. Run script

In [None]:
#Step 1 Load and review data. With possible data potentials

import pandas as pd
import glob
import os

# locally saved path
daily_data_folder = '/content/daily_data/'

# csv path for all files
file_paths = glob.glob(f"{daily_data_folder}/*.csv")

first_file = pd.read_csv(file_paths[0])

# potential data issues

# missing values
print(first_file.isnull().sum())

# data type
print(first_file.dtypes)

# Additional other data checks
# valid all venue_id/visitor_ids are real IDs against current db.
# Time consistency. Start date not greater thte end date
# duplicate data sets --  possibility of true duplicates?

# Basic statistics
print(first_file.describe(include='all'))





venue_id             0
visitor_id           0
visit_start_time     0
visit_end_time      19
venue_type           3
dtype: int64
venue_id            object
visitor_id          object
visit_start_time    object
visit_end_time      object
venue_type          object
dtype: object
       venue_id visitor_id            visit_start_time  \
count      2182       2182                        2182   
unique       15       1087                         783   
top       UN004      V0205  2024-11-15 09:00:00.000000   
freq        348          6                           9   

                    visit_end_time  venue_type  
count                         2163        2179  
unique                        2152           4  
top     2024-11-15 15:43:35.462349  university  
freq                             2        1599  


In [None]:
import os
import pandas as pd


file_path = '/content/daily_data/20241028.csv'

def process_daily_file_df(file_path):
    """
    Processes a daily file of venue visitation data, cleans it, computes metrics,
    and appends the results to a summary CSV file.

    Parameters:
    - file_path (str): Path to the daily CSV file to process.
    - summary_file_name (str): Path to the summary CSV file to save results. Default: 'daily_visitation_summary.csv'.
    """

    daily_data = pd.read_csv(file_path)

    # drop rows with missing or invalid data
    daily_data.dropna(inplace=True)
    # an alternative would be go enrich the missing/valid data on case by case assumptions; of course this is with given business context
    # e.g. missing venue types can be extraploited from the venue_ID (with some mapping) - can use historicals to define mapping
    # e.g. visiting_end time - can end at the 23:59:59 - true end of day, ( assuming no staying overnight issues)


    # convert columns to their proper data types
    daily_data['visit_start_time'] = pd.to_datetime(daily_data['visit_start_time'], errors='coerce')
    daily_data['visit_end_time'] = pd.to_datetime(daily_data['visit_end_time'], errors='coerce')
    daily_data['venue_id'] = daily_data['venue_id'].astype('string')
    daily_data['visitor_id'] = daily_data['visitor_id'].astype('string')
    daily_data['venue_type'] = daily_data['venue_type'].astype('string')

    # calc metrics
    results = daily_data.groupby('venue_id').agg(
        visitor_count_unique=('visitor_id', 'nunique'),
        visitor_count_total=('visitor_id', 'count')
    ).reset_index()

    # date column (extracted from the file name)
    processing_date = os.path.basename(file_path).split('.')[0]
    results['date'] = processing_date

    # simple average
    overall_average = results['visitor_count_total'].mean()

    # overall average as the prediction for all venues
    results['visitor_count_total_prediction'] = overall_average


    # reorder columns to match the output schema
    results = results[['date', 'venue_id', 'visitor_count_unique', 'visitor_count_total', 'visitor_count_total_prediction']]

    summary_file_name = 'daily_visitation_summary.csv'
    # save the results to the summary file
    if not os.path.exists(summary_file_name):
        results.to_csv(summary_file_name, index=False)
    else:
        results.to_csv(summary_file_name, mode='a', header=False, index=False)

    print(f"Processed and saved data for {processing_date}")


# Process to for call one file.
process_daily_file_df(file_path)


# # Loop through all files..
# for file_path in file_paths:
    # process_daily_file_df(file_path)

Processed and saved data for 20241115
Processed and saved data for 20241107
Processed and saved data for 20241028
Processed and saved data for 20241108
Processed and saved data for 20241110
Processed and saved data for 20241031
Processed and saved data for 20241104
Processed and saved data for 20241109
Processed and saved data for 20241111
Processed and saved data for 20241029
Processed and saved data for 20241102
Processed and saved data for 20241105
Processed and saved data for 20241106
Processed and saved data for 20241117
Processed and saved data for 20241113
Processed and saved data for 20241116
Processed and saved data for 20241112
Processed and saved data for 20241030
Processed and saved data for 20241114
Processed and saved data for 20241103
Processed and saved data for 20241101


# Questions
1. How would you scale this pipeline to handle:
 - 20 million locations of 20 million users
  
  This would be focused on distributed processing (most likely spark). From there is a matter of optimization for the processing of the data. Ensuring partitions keys are set correctly so that the workload can be properly distributed.

 - Real-time updates

  Messaging/queueing systems (Redis). These systems will allow for real time processing of data as they come in, and queue up if there is a sudden overflow of data. It will continue to process and queue up if there are sudden spikes.

2. How would you store the data?

Data lake style on cloud provider BLOB systems. For GCP thats google cloud storage (Buckets).

3. How would you monitor data quality?

The would consist of strong testings throughout the pipeline. It'll can be done with python tests (great expectations), sql (dbt based), or even external observability tools (monte carlos).

4. What would your daily orchestration look like?

On an airflow platform, most likely consistenting of simple tasks extract -> transform -> load. With retries and error handling where needed.