Import all necessary libraries

In [26]:
import os
import re
from pyarrow import json
import pyarrow.parquet as pq

Define our global variables.
`TEMPORAL_DIR` is the temporary landing zone where raw files will be placed that need to be processed
`PERSISTANT_DIR` will be the location of files converted to the selected file format

In [27]:
TEMPORAL_DIR = '../data/raw'
PERSISTENT_DIR = '../data/processed'

Here we create a simple function that will convert a JSON file into a parquet file, and place the converted file into the appropriate location

In [28]:
def convert_json_to_parquet(input_filename, input_dir, output_filename, output_dir):
    '''
    This function will take an input file in the form of JSON from a given directory,
    convert the file to a parquet, and place the file in a directory specified in parameters.

    :param input_filename: filename (including extension) that will be converted into parquet file
    :param input_dir: directory where the JSON file exists
    :param output_dir: directory where the parquet file should be placed after conversion
    :param output_filename: filename that will be given to converted parquet file
    :return: None
    '''
    table = json.read_json(f'{input_dir}/{input_filename}')
    pq.write_table(table, f'{output_dir}/{output_filename}')

First we can strip primary metadata information from the filename as received from the website.

In [32]:
for filename in os.listdir(TEMPORAL_DIR): # iterate over all files in directory DIR
    if not filename.startswith('.'): # do not process hidden files that start with "."
        metadata = re.split('[-.]',filename) # splits the filename on '-' and '.' -> creates a list
        file_directory = f"{PERSISTENT_DIR}/{metadata[0]}/{metadata[1]}" # uses YYYY/MM as the name of the sub-directory
        new_filename = f"{metadata[3]}-{metadata[4]}" # new file name will be userID-taskID
        if not os.path.exists(file_directory): # creates the directory if it doesn't exist
            os.makedirs(file_directory)
        if metadata[5] == "json":
            convert_json_to_parquet(filename, TEMPORAL_DIR, new_filename, file_directory)
        elif metadata[5] == "csv":
            print("This is where Vlada's function will be placed") # TODO: Replace with Vlada's function to convert from CSV to parquet