In [116]:
import urllib.request
import zlib
import pandas as pd
import numpy as np
import json
import datetime

# Cleaning the dataset structure

I will be using for this test the '2015-01-01-15.json.gz' file

### Download and transform to a json form

In [182]:
year = 2015
month = 1
day = 1
hour = 15

The header with the user-agent is important since without it, the page denegate the access.

In [4]:
def download_gharchive_data(year, month, day, hour):
    url = f'http://data.gharchive.org/{year}-{month:02}-{day:02}-{hour}.json.gz'
    req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
    response = urllib.request.urlopen(req)
    compressed_data = response.read()
    
    return compressed_data

- Decompress the data
- Since there was no list, it could not be loaded via json.loads directly => 
=> split by line and then each line read to convert it to a dictionary =>
list of dictionaries to a pandas dataframe

In [None]:
def compressed_data_to_list_of_dicts(compressed_data):
    decompressed_data = zlib.decompress(compressed_data, 16 + zlib.MAX_WBITS)

    decoded_lines = decompressed_data.decode('utf-8').split('\n')
    decoded_lines = filter(lambda x: x != '', decoded_lines) 

    json_decoded_lines = [json.loads(line) for line in decoded_lines]
    return json_decoded_lines

In [183]:
compressed_data = download_gharchive_data(year, month, day, hour)
json_decoded_lines = compressed_data_to_list_of_dicts(compressed_data)

NameError: name 'download_gharchive_data' is not defined

### Implement the schema

I based my schema on https://github.com/igrigorik/gharchive.org/blob/master/bigquery/schema.js

- Pandas dataframe
- created_at column to datetime
- columns with a fixed maximum of json keys (`actor`, `repo`, `org`), are expanded
to columns in the table following the formula for each column name: colname_key
- columns with variable json format (`payload`, `other`) has been left as dictionaries
- After performing this, the dataframe will be check against the schema:
    - Check the columns: 
        - add missing columns
        - drop those who should not exist (sometimes happens when expanding the jsons)
    - Apply the dtypes in the schema into the dataframe
    
    Note: `org_id` sometimes is NaN. Therefore, nullable integer has been used


In [167]:
extended_schema_dtypes = {
    'id': 'object', 
    'type': 'object', 
    'actor_id': np.int64,
    'actor_login': 'object',
    'actor_gravatar_id': 'object',
    'actor_avatar_url': 'object',
    'actor_url': 'object',
    'repo_id': np.int64,
    'repo_name': 'object', 
    'repo_url': 'object', 
    'payload': 'object', 
    'public': 'bool',
    'created_at': 'datetime64[ns, UTC]', 
    'org_id': pd.Int64Dtype(), 
    'org_login': 'object',
    'org_gravatar_id': 'object',
    'org_avatar_url': 'object',
    'org_url': 'object',
    'other': 'object'}

In [168]:
df = pd.DataFrame(json_decoded_lines)

df.created_at = pd.to_datetime(df.created_at)

In [169]:
def expand_dictionary_df_columns(df, dict_cols):
    expanded_data = {}
    for col in dict_cols:
        ind_col_df = df[col].apply(pd.Series, dtype='object')
        ind_col_df = ind_col_df.add_prefix(col + '_') 
        expanded_data.update(ind_col_df)

    expanded_df = pd.DataFrame(expanded_data)

    not_dict_cols_df = df.loc[:, ~df.columns.isin(dict_cols)]
    result_df = pd.concat([not_dict_cols_df, expanded_df], axis=1)

    return result_df

In [170]:
dict_cols = ['actor', 'repo', 'org']
new_df = expand_dictionary_df_columns(df, dict_cols)

In [172]:
def _add_missing_columns(df, schema):
    missing_columns = [col for col in schema.keys() if col not in df.columns]
    for col in missing_columns:
        df[col] = pd.Series(dtype=schema[col], name=col)
    return df

In [173]:
def _drop_columns_not_in_schema(df, schema):
    columns_to_drop = [col for col in df.columns if col not in schema.keys()]
    df = df.drop(columns=columns_to_drop, axis=1)
    return df

In [174]:
def _adapt_columns_to_schema(df, schema):
    df = _add_missing_columns(df, schema)
    df = _drop_columns_not_in_schema(df, schema)
    return df

In [175]:
def _apply_schema_types_to_df_columns(df, schema):
    for col, dtype in schema.items():
        if df[col].dtype != dtype:
            try:
                df[col] = df[col].astype(dtype)
            except ValueError:
                print(f'{col} cannot be casted to {dtype}')
    return df

In [None]:
def convert_df_to_schema(df, schema):
    new_df = df.copy()
    new_df = _adapt_columns_to_schema(new_df, schema)
    new_df = _apply_schema_types_to_df_columns(new_df, schema)
    return new_df

In [180]:
cleaned_df = convert_df_to_schema(new_df, extended_schema_dtypes)
cleaned_df.to_parquet('test.parquet')

In [181]:
pd.read_parquet('test.parquet')

Unnamed: 0,id,type,payload,public,created_at,actor_id,actor_login,actor_gravatar_id,actor_url,actor_avatar_url,repo_id,repo_name,repo_url,org_id,org_login,org_gravatar_id,org_url,org_avatar_url,other
0,2489626364,DeleteEvent,"{'action': None, 'before': None, 'comment': No...",True,2015-01-01 14:00:00+00:00,1917297,mooring,,https://api.github.com/users/mooring,https://avatars.githubusercontent.com/u/1917297?,28670073,mooring/smarthost,https://api.github.com/repos/mooring/smarthost,,,,,,
1,2489626365,PushEvent,"{'action': None, 'before': '3ca405ae3d5d899b09...",True,2015-01-01 14:00:00+00:00,1824391,ramirovjr,,https://api.github.com/users/ramirovjr,https://avatars.githubusercontent.com/u/1824391?,28444982,ramirovjr/devops,https://api.github.com/repos/ramirovjr/devops,,,,,,
2,2489626366,PushEvent,"{'action': None, 'before': 'ebe5f18667b34f7787...",True,2015-01-01 14:00:00+00:00,3101557,mehdisadeghi,,https://api.github.com/users/mehdisadeghi,https://avatars.githubusercontent.com/u/3101557?,25783700,mehdisadeghi/mehdix.ir,https://api.github.com/repos/mehdisadeghi/mehd...,,,,,,
3,2489626368,DeleteEvent,"{'action': None, 'before': None, 'comment': No...",True,2015-01-01 14:00:00+00:00,844208,annando,,https://api.github.com/users/annando,https://avatars.githubusercontent.com/u/844208?,3302872,annando/friendica,https://api.github.com/repos/annando/friendica,,,,,,
4,2489626369,PushEvent,"{'action': None, 'before': '410e23462261c778e4...",True,2015-01-01 14:00:00+00:00,963332,srjohn,,https://api.github.com/users/srjohn,https://avatars.githubusercontent.com/u/963332?,28641735,srjohn/johnson.net,https://api.github.com/repos/srjohn/johnson.net,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10302,2489651033,PushEvent,"{'action': None, 'before': '8f315f0232332f05ab...",True,2015-01-01 14:59:58+00:00,10358743,weshellnet,,https://api.github.com/users/weshellnet,https://avatars.githubusercontent.com/u/10358743?,28670754,weshellnet/weshellnet.github.io,https://api.github.com/repos/weshellnet/weshel...,,,,,,
10303,2489651038,CreateEvent,"{'action': None, 'before': None, 'comment': No...",True,2015-01-01 14:59:59+00:00,7693186,ashishparkhi,,https://api.github.com/users/ashishparkhi,https://avatars.githubusercontent.com/u/7693186?,27871780,IDeaSCo/rockstar,https://api.github.com/repos/IDeaSCo/rockstar,8440704,IDeaSCo,,https://api.github.com/orgs/IDeaSCo,https://avatars.githubusercontent.com/u/8440704?,
10304,2489651040,CreateEvent,"{'action': None, 'before': None, 'comment': No...",True,2015-01-01 14:59:59+00:00,7732667,lihechao,,https://api.github.com/users/lihechao,https://avatars.githubusercontent.com/u/7732667?,28688589,lihechao/scheduler,https://api.github.com/repos/lihechao/scheduler,,,,,,
10305,2489651041,PushEvent,"{'action': None, 'before': '1a71a15c719d358d2d...",True,2015-01-01 14:59:59+00:00,667327,sam-d,,https://api.github.com/users/sam-d,https://avatars.githubusercontent.com/u/667327?,18569518,sam-d/sam-d.github.io,https://api.github.com/repos/sam-d/sam-d.githu...,,,,,,
