# MSBD5003 Group Project - preprocessing

In this part, we process the csv files as they are corrupted.

1. `preprocess/preprocess.py` (no spark, pandas parallel version), `preprocess/preprocess_spark.py` (spark version)
2. data is loaded, selected, joined (category_title), removed null, detect langugage
3. preprocessed data is saved into `data/processed`

Features:
1. video_id
2. title
3. category_id
4. tags
5. views
6. likes
7. dislikes
8. comment_count
9. description
10. category_title
11. region*
12. lang**

\* region: CA, DE, FR, GB, IN, JP, KR, MX, RU, US <br>
** lang: see [wiki](https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes) for lang code

### import libs

In [1]:
try:
    sc
except:
    print("That is not SparkContext. Initializing SparkContext")
    from pyspark import SparkContext
    sc = SparkContext("local", "preprocessing")
try:
    import langdetect
except:
    print("There is no langdetect, installing")
    !pip install langdetect
    import langdetect
import json
import pandas as pd
import os, shutil
from time import time
from io import StringIO
import csv
import numpy as np
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import *

In [2]:
## function to read
features = ['video_id', 'title', 'category_id', 'tags', 'views', 'likes', 'dislikes', 'comment_count',
            'description']
regions = ['CA', 'DE', 'FR', 'GB', 'IN', 'JP', 'KR', 'MX', 'RU', 'US']
count = 0

## single_process

In [3]:
def line_by_line_parse(txt: str):
    """
    Read the csv file line by line
    remove the lines that are not properly formatted
    :param txt:
    :return: (data, number of record removed)
    """
    lines = txt.split('\n')
    ok_lines = []
    ok_line_indexs = []
    fail_lines = []
    fail_line_indexs = []
    for i, line in enumerate(lines):
        f = StringIO(line)
        c = list(csv.reader(f))
        if len(c) > 1:
            raise ValueError('Number of lines of a line > 1')
        elif len(c) <= 0:
            fail_lines.append(c)
            fail_line_indexs.append(i)
            continue
        c = c[0]
        if len(c) == 16:
            ok_lines.append(c)
            ok_line_indexs.append(i)
        else:
            fail_lines.append(c)
            fail_line_indexs.append(i)
    fail_line_indexs = np.array(fail_line_indexs)
    invalid_line_indexs = [i[0] - 1 for i in
                           np.split(fail_line_indexs, np.where(np.diff(fail_line_indexs) != 1)[0] + 1)]
    for _ind in invalid_line_indexs:
        ind = ok_line_indexs.index(_ind)
        ok_line_indexs.pop(ind)
        ok_lines.pop(ind)
    data = pd.DataFrame(ok_lines[1:], columns=ok_lines[0])
    return data, len(invalid_line_indexs)


def drop_invalid_byte(buf, offset=0, calls=0) -> tuple:
    """

    :param buf:
    :param offset:
    :param calls:
    :return: text, number of byte removed,
    """
    start = 0
    end = len(buf)
    try:
        # print(f"Extracting from {start+offset}:{end+offset}")
        temp = buf[start:end].decode('utf-8-sig')
        return temp, calls
    except UnicodeDecodeError as err:
        ERR = err
        # print(ERR)
        if ERR.reason == 'invalid continuation byte':
            tailstr, tail_calls = drop_invalid_byte(buf[ERR.end + 1: end], ERR.end + 1)
            return buf[start:ERR.start].decode('utf-8-sig') + tailstr, calls + 1 + tail_calls
        elif ERR.reason == 'invalid start byte':
            return buf[start + 1: end], calls + 1
        elif ERR.reason == 'unexpected end of data':
            return buf[start: end - 1], calls + 1
    except Exception as err:
        raise err


def safe_read_csv(csv_name: str) -> pd.DataFrame:
    try:
        return pd.read_csv(csv_name, encoding='utf-8-sig')
    except UnicodeDecodeError:
        f = open(csv_name, 'rb')
        raw_buffer = f.read()
        f.close()
        txt, nbytes = drop_invalid_byte(raw_buffer)
        print(f"Removed {nbytes} invalid bytes")
        f = StringIO(txt)
        try:
            return pd.read_csv(f)
        except:
            data, num_of_invalid_lines = line_by_line_parse(txt)
            print(f"Removed {num_of_invalid_lines} records in {csv_name}")
            return data

def detect_language(x):
    import langdetect
    try:
        return langdetect.detect(x)
    except:
        return ''
detect_language_udf = udf(detect_language, StringType())

In [4]:
for region in regions:
    if os.path.isfile(f'../data/processed/spark/{region}.csv'):
        os.remove(f'../data/processed/spark/{region}.csv')
    elif os.path.isdir(f'../data/processed/spark/{region}.csv'):
        shutil.rmtree(f'../data/processed/spark/{region}.csv')
    print(f"Start process {region}")
    start_time = time()
    csv_name = f'../data/raw/{region}videos.csv'
    json_name = f'../data/raw/{region}_category_id.json'
    data = safe_read_csv(csv_name)[features].astype(
                    {'video_id': str, 'title': str, 'category_id': int, 'tags': str, 'views': int, 'likes': int,
                     'dislikes': int, 'comment_count': int, 'description': str})
    category_data = pd.DataFrame(list(
                    map(lambda x: {'category_id': int(x['id']), 'category_title': x['snippet']['title']},
                        json.load(open(json_name, 'r'))['items'])))
    data_sp = spark.createDataFrame(data)
    cat_data_sp = spark.createDataFrame(category_data)
    data_sp = data_sp.join(cat_data_sp, 'category_id')
    data_sp = data_sp.withColumn('region', lit(region)).cache()
    data_sp = data_sp.select('*', detect_language_udf(data_sp['title']).alias('lang'))
    data_sp.write.csv(f'../data/processed/spark/{region}.csv')
    print(f"Finished process {region} (with {data_sp.count()} records) in {time() - start_time:.2f}s")

Start process CA
Finished process CA (with 40807 records) in 164.13s
Start process DE
Finished process DE (with 40584 records) in 145.11s
Start process FR
Finished process FR (with 40610 records) in 170.29s
Start process GB
Finished process GB (with 38826 records) in 197.18s
Start process IN
Finished process IN (with 37247 records) in 192.06s
Start process JP
Removed 9 invalid bytes
Finished process JP (with 20505 records) in 32.13s
Start process KR
Removed 66 invalid bytes
Removed 1 records in ../data/raw/KRvideos.csv
Finished process KR (with 34279 records) in 43.76s
Start process MX
Removed 3 invalid bytes
Finished process MX (with 40197 records) in 197.89s
Start process RU
Removed 43 invalid bytes
Removed 9 records in ../data/raw/RUvideos.csv
Finished process RU (with 39183 records) in 119.50s
Start process US
Finished process US (with 40949 records) in 122.16s
