# Devices pre-processing

This task will fetch the devices raw data, perform data cleaning, store metadata information in mongo and store processed data in parquet files.

## Import libraries

In [1]:
import sys

sys.path.append('..')

In [2]:
import os
import yaml
import requests
import numpy as np
import pandas as pd

from pymongo import MongoClient

from utils import get_csv_files, memory_usage, \
    correct_encoding, update_page_metadata

## Load the data

In [3]:
BASE_PATH = '../../'

CONFIG_DIR = os.path.join(BASE_PATH, 'config')
STORAGE_DIR = os.path.join(BASE_PATH, 'storage')
DATA_DIR = os.path.join(BASE_PATH, 'data')

config = yaml.load(open(os.path.join(CONFIG_DIR, 'env.yml')),
                   Loader=yaml.FullLoader)

In [4]:
model = 'devices'
model_storage = os.path.join(STORAGE_DIR, model)

if not os.path.exists(model_storage):
    os.makedirs(model_storage)

client = MongoClient('mongodb://{}:{}@{}:{}'.format(config['MONGO_USERNAME'], config['MONGO_PASSWORD'],
                                                    config['MONGO_HOST'], config['MONGO_PORT']))
metadata_db = client[config['MONGO_DATABASE']]

model_metadata = metadata_db[model]

model_metadata.delete_many({})
model_metadata.insert_one({'pages': []})

<pymongo.results.InsertOneResult at 0x7f672b39dbc0>

In [5]:
csv_files = get_csv_files(os.path.join(DATA_DIR, 'raw', model, '*.csv'))

In [6]:
if not csv_files:
    raise FileNotFoundError(
        'Couldn\'t find any csv files! Please make sure the filepath exists')

In [7]:
df = pd.read_csv(csv_files[0], sep=';', parse_dates=['created_at'], nrows=100)

# sample rows to list columns and dtypes
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   id            100 non-null    int64         
 1   model         100 non-null    object        
 2   manufacturer  100 non-null    object        
 3   brand         100 non-null    object        
 4   os_version    100 non-null    object        
 5   is_root       100 non-null    int64         
 6   created_at    100 non-null    datetime64[ns]
dtypes: datetime64[ns](1), int64(2), object(4)
memory usage: 5.6+ KB


In [8]:
df.describe(include=[np.number])

Unnamed: 0,id,is_root
count,100.0,100.0
mean,50.5,0.06
std,29.011492,0.238683
min,1.0,0.0
25%,25.75,0.0
50%,50.5,0.0
75%,75.25,0.0
max,100.0,1.0


In [9]:
df.describe(exclude=[np.number])

Unnamed: 0,model,manufacturer,brand,os_version,created_at
count,100,100,100,100.0,100
unique,72,22,27,11.0,100
top,ALE-L21,samsung,samsung,7.0,2017-10-09 10:26:29
freq,8,28,28,34.0,1
first,,,,,2017-10-09 03:42:19
last,,,,,2017-10-09 11:14:12


## Basic data pre-processing

In [10]:
def save_df_page(page, chunks, collection, exclude=['id']):
    # concatenate data chunks -> careful benchmark this
    df = pd.concat(chunks, axis=0, ignore_index=True)

    # store page metadata in mongo
    update_page_metadata(collection, df)

    # save output to a parquet file with brotli compression
    df.to_parquet(os.path.join(model_storage, '{}.{}.parquet'.format(
        model, page)), compression='brotli')

    return page + 1

In [11]:
mappings = {'id': 'uint32', 'model': 'category', 'manufacturer': 'category',
            'brand': 'category', 'os_version': 'category', 'is_root': 'bool'}

model_metadata.update_one({}, {'$set': {'mappings': mappings}})

<pymongo.results.UpdateResult at 0x7f672a321ec0>

In [12]:
total_rows = 0
total_memory_usage = 0.0
memory_usage_split = 0.0
last_id = 0
frequency_threshold = 0.001

page = 0
chunks = []
models_set = set()

In [13]:
print('Total csv files: {}'.format(len(csv_files)))

Total csv files: 2


In [14]:
for filepath in csv_files:
    df = pd.read_csv(filepath, sep=';', parse_dates=['created_at'])

    # os_version
    df = df.drop(df[df['os_version'].str.isalpha()].index, axis=0)
    
    # slice the minor os_version number e.g. 8.0.1 -> 8.0
    df.loc[:, 'os_version'] = ['.'.join(x.split('.')[:2]) for x in df['os_version']]
    
    # drop low frequency values
    value_counts = df['os_version'].value_counts(normalize=True)
    df = df[df['os_version'].isin(value_counts[value_counts >= frequency_threshold].index)]
    
    df.loc[:, 'os_version'] = df['os_version'].map(float)
    
    # drop missing values
    df = df.dropna()
    
    string_columns = df.select_dtypes(include='object').columns.to_list()

    # basic string pre-processing: convert to lowercase and strip blank chars
    for column in string_columns:
        df.loc[:, column] = [x.lower().strip() for x in df[column]]

    df = df.astype(mappings)

    total_rows += df.shape[0]
    page_memory_usage = memory_usage(df)
    total_memory_usage += page_memory_usage
    memory_usage_split += page_memory_usage

    last_id = max(last_id, df['id'].max())
    models_set.update(df['model'].unique().to_list())

    if memory_usage_split >= config['MEMORY_USAGE_SPLIT']:
        print('Page {} created!'.format(page))
        page = save_df_page(page, chunks, model_metadata)

        del chunks[:]
        del chunks
        chunks = []
        memory_usage_split = 0.0
    else:
        chunks.append(df)
        
    print('{} ✔'.format(filepath))

../../data/raw/devices/devices.query.1.csv ✔
../../data/raw/devices/devices.query.2.csv ✔


In [15]:
if chunks:
    save_df_page(page, chunks, model_metadata)
    del chunks[:]
    del chunks

In [16]:
model_metadata.update_one({}, {'$set': {'last_id': int(last_id)}})

<pymongo.results.UpdateResult at 0x7f672b4b9d40>

In [17]:
total_rows, total_memory_usage, last_id

(307175, 10.52, 307617)

In [18]:
# content = '**{}** | task: data pre-processing, rows: {}, memory usage: {} MB' \
# .format(model,total_rows, total_memory_usage)

# requests.post(config['discord_webhook_url'], data={'content': content})