### **DATA PERSISTANCE LOADER**

Import the necessary packages

In [None]:
# Avro
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

import subprocess
import pandas as pd
from io import StringIO
from datetime import datetime
import json
import re
import os

Set path of project

In [None]:
path = 'D:/Master/BDMA/Courses/Semester_2/Big_Data_Management/Project/dataimporta/'

Check how many files need to be uploaded into the persistence zone (the count considering all countries)

In [None]:
# HDFS command to find count of directories and files inside a directory
count_files = subprocess.run('hadoop fs -count /temporal', capture_output=True, shell=True).stdout.decode()
# From the answer, get the count of files (note that this is the count for ALL countries)
count_files = re.findall('\d*\s*(\d*)\s*\d*\s/', count_files)
count_files = count_files[0]
print('# of files to upload: '+count_files)

Countries to upload

In [None]:
# Countries to get data from
countries = ["peru", "chile", "brazil"]
# Countries that have different metadata for imports and exports
countries_meta = ["chile"]

Check which directories have data in the temporal landing zone and get those file names

In [None]:
# Dictionary to store the file names
filenames = {}
for country in countries:
    # For countries that have different metadata for imports and exports
    if country in countries_meta:
        filenames[country] = {"imports":[], "exports":[], "metadata":{"imports":[], "exports":[]}}
    # For countries that have the same metadata for imports and exports
    else:
        filenames[country] = {"imports":[], "exports":[], "metadata":[]}

# Iterate trough each country directory
for country in countries:

    # Get the content of the directories of the country
    # imports
    imports = subprocess.run('hadoop fs -ls /temporal/'+country+'/imp', capture_output=True, shell=True).stdout.decode()
    # exports
    exports= subprocess.run('hadoop fs -ls /temporal/'+country+'/exp', capture_output=True, shell=True).stdout.decode()
    # metadata
    if country in countries_meta:
        metadata_imports = subprocess.run('hadoop fs -ls /temporal/'+country+'/metadata/imp', capture_output=True, shell=True).stdout.decode()
        metadata_exports = subprocess.run('hadoop fs -ls /temporal/'+country+'/metadata/exp', capture_output=True, shell=True).stdout.decode()
    else:
        metadata = subprocess.run('hadoop fs -ls /temporal/'+country+'/metadata', capture_output=True, shell=True).stdout.decode()
    
    # Get the names of the files (if any) existing in those directories
     # imports
    imports_files = re.findall('/.*/(.*)\r', imports)
     # exports
    exports_files = re.findall('/.*/(.*)\r', exports)
    # metadata
    if country in countries_meta:
        metadata_import_files =re.findall('/.*/(.*)\r', metadata_imports)
        metadata_export_files =re.findall('/.*/(.*)\r', metadata_exports)
    else:
        metadata_files =re.findall('/.*/(.*)\r', metadata)


    # Save those names in the variable "file"
    #   For imports
    for import_file in imports_files:
        filenames[country]['imports'].append(import_file)
    #   For exports
    for export_file in exports_files:
        filenames[country]['exports'].append(export_file)
    #   For metadata
    if country in countries_meta:
        # imports
        for metadata_import_file in metadata_import_files:
            filenames[country]['metadata']['imports'].append(metadata_import_file)
        # exports
        for metadata_export_file in metadata_export_files:
            filenames[country]['metadata']['exports'].append(metadata_export_file)
    else:
        for metadata_file in metadata_files:
            filenames[country]['metadata'].append(metadata_file)

Now that we have the files to be uploaded, we need to know to which year do they correspond

In [None]:
# Dictionary with the file names and the corresponding years
filenames_year = {}
for country in countries:
    # For countries that have different metadata for imports and exports
    if country in countries_meta:
        filenames_year[country] = {"imports":{}, "exports":{}, "metadata":{"imports":{}, "exports":{}}}
    # For countries that have the same metadata for imports and exports
    else:
        filenames_year[country] = {"imports":{}, "exports":{}, "metadata":{}}

    # Get years of files for Peru
    if country == "peru":
        # Imports
        for filename in filenames[country]["imports"]:
            filenames_year[country]["imports"][filename] = '20'+ re.search('(.{2})(?=.csv)', filename).group(1)
        # Exports
        for filename in filenames[country]["exports"]:
            filenames_year[country]["exports"][filename] = '20'+ re.search('(.{2})(?=.csv)', filename).group(1)
        # Metadata
        for filename in filenames[country]["metadata"]:
            filenames_year[country]["metadata"][filename] = datetime.now().date().strftime("%Y")

    # Get years of files for Chile
    if country == "chile":
        # Imports
        for filename in filenames[country]["imports"]:
            filenames_year[country]["imports"][filename] = re.search('(\d*)(?=.txt)', filename).group(1)
        # Exports
        for filename in filenames[country]["exports"]:
            filenames_year[country]["exports"][filename] = re.search('(\d*)(?=.txt)', filename).group(1)
        # Metadata
        # imports
        for filename in filenames[country]["metadata"]["imports"]:
            filenames_year[country]["metadata"]["imports"][filename] = datetime.now().date().strftime("%Y")
        # exports
        for filename in filenames[country]["metadata"]["exports"]:
            filenames_year[country]["metadata"]["exports"][filename] = datetime.now().date().strftime("%Y")
    
    # Get years of files for Brazil
    if country == "brazil":
        # Imports
        for filename in filenames[country]["imports"]:
            filenames_year[country]["imports"][filename] = re.search('_(\d{4})_', filename).group(1)
        # Exports
        for filename in filenames[country]["exports"]:
            filenames_year[country]["exports"][filename] = re.search('_(\d{4})_', filename).group(1)
         # Metadata
        for filename in filenames[country]["metadata"]:
            filenames_year[country]["metadata"][filename] = datetime.now().date().strftime("%Y")
            


Get the data years by country:

In [None]:
years = {}
# Get all the years existing in filenames_year
for country in countries:
    years[country] = []
    # imports
    for file, year in filenames_year[country]["imports"].items():
        years[country].append(year)
    # exports
    for file, year in filenames_year[country]["exports"].items():
        years[country].append(year)
    # metadata
    if country in countries_meta:
        # imports
        for file, year in filenames_year[country]["metadata"]["imports"].items():
            years[country].append(year)
        # exports
        for file, year in filenames_year[country]["metadata"]["exports"].items():
            years[country].append(year)
    else:
        for file, year in filenames_year[country]["metadata"].items():
            years[country].append(year)

# Remove duplicated years from dict
for country in years:   
    years[country] = list(set(years[country]))
years


Check if the persistent zone already has the corresponding "years" directories to upload the files into, if not, create them:

In [None]:
for country in countries:
    for year in years[country]:

        # imports
        years_in_persistent = subprocess.run('hadoop fs -ls /persistent/'+country+'/imp/'+year , capture_output=True, shell=True).stdout.decode()
        years_in_persistent = re.findall('/.*/(.*)\r', years_in_persistent)
        # if the year directory does not exist in the persistent folder, create it
        if year not in years_in_persistent:
            subprocess.run('hadoop fs -mkdir /persistent/'+country+'/imp/'+year , capture_output=True, shell=True)

        # exports
        years_in_persistent = subprocess.run('hadoop fs -ls /persistent/'+country+'/exp/'+year , capture_output=True, shell=True).stdout.decode()
        years_in_persistent = re.findall('/.*/(.*)\r', years_in_persistent)
        # if the year directory does not exist in the persistent folder, create it
        if year not in years_in_persistent:
            subprocess.run('hadoop fs -mkdir /persistent/'+country+'/exp/'+year , capture_output=True, shell=True)

        # metadata
        if country in countries_meta:

            # imports
            years_in_persistent = subprocess.run('hadoop fs -ls /persistent/'+country+'/metadata/imp/'+year , capture_output=True, shell=True).stdout.decode()
            years_in_persistent = re.findall('/.*/(.*)\r', years_in_persistent)
            # if the year directory does not exist in the persistent folder, create it
            if year not in years_in_persistent:
                subprocess.run('hadoop fs -mkdir /persistent/'+country+'/metadata/imp/'+year , capture_output=True, shell=True)

            # exports
            years_in_persistent = subprocess.run('hadoop fs -ls /persistent/'+country+'/metadata/exp/'+year , capture_output=True, shell=True).stdout.decode()
            years_in_persistent = re.findall('/.*/(.*)\r', years_in_persistent)
            # if the year directory does not exist in the persistent folder, create it
            if year not in years_in_persistent:
                subprocess.run('hadoop fs -mkdir /persistent/'+country+'/metadata/exp/'+year , capture_output=True, shell=True)

        else:
            
            years_in_persistent = subprocess.run('hadoop fs -ls /persistent/'+country+'/metadata/'+year , capture_output=True, shell=True).stdout.decode()
            years_in_persistent = re.findall('/.*/(.*)\r', years_in_persistent)
            # if the year directory does not exist in the persistent folder, create it
            if year not in years_in_persistent:
                subprocess.run('hadoop fs -mkdir /persistent/'+country+'/metadata/'+year , capture_output=True, shell=True)

Now let's start the upload. Since we will save the files in AVRO format, they must be first converted and only then uploaded to the corresponding directory. Each country will have a different upload pipeline, given the differences of the data

In [None]:
for country in countries:

    # initial file name inside a directory:
    initial_name = "version0.avro"

    if country == 'peru':

        # paths to folders:
        folders = {"imports":"imp", "exports":"exp", "metadata":"metadata"}
        
        for folder, folderpath in folders.items():
            # imports or exports
            if folder == 'imports' or folder == 'exports':

                for filename, year in filenames_year['peru'][folder].items():

                    # Print status
                    print("Working on: "+country+" | "+folder+" | "+filename+'...')

                    # Check if there is any persistent file to append the temporal file's data into (e.g. the most recent version). If there is no persistent file, create one (version0.avro)
                    all_versions = subprocess.run('hadoop fs -ls /persistent/'+country+'/'+folderpath+'/'+year, capture_output=True, shell=True).stdout.decode()
                    
                    # If no persistent file, create one
                    if all_versions == '':
                        # convert the temporal file to AVRO
                        # - first retrieve the data and convert it into a dataframe
                        data = subprocess.run('hadoop fs -cat /temporal/'+country+'/'+folderpath+'/'+filename, capture_output=True, shell=True, encoding="latin-1").stdout
                        data = pd.read_csv(StringIO(data))
                        # - then create the AVRO schema
                        # -- get the column names and create the fields argument for the schema
                        fields = []
                        for col in list(data.columns):
                            fields.append({"name":col, "type": "string"})
                        # -- complete the schema with the desired information
                        schema = {
                            "doc": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
                            "name": "trade_item",
                            "namespace": country+"_"+folder,
                            "type": "record",
                            "fields": fields
                        }
                        schema = json.dumps(schema)
                        schema = avro.schema.parse(schema)
                        # - mutate the records to AVRO format
                        records = data.to_dict('records')
                        for dicts in records:
                            # ensure all the data are strings
                            for keys in dicts:
                                dicts[keys] = str(dicts[keys])
                        # - create the AVRO file with the data in the local system
                        writer = DataFileWriter(open(path+initial_name, "wb"), DatumWriter(), schema)
                        # - append each record into the file
                        for record in records:
                            writer.append(record)
                        writer.close()
                    
                        # after creating the AVRO file, upload it into HDFS
                        load_hdfs = subprocess.run(['hadoop', 'fs', '-put', path+initial_name, '/persistent/'+country+'/'+folderpath+'/'+year], capture_output=True, shell=True)
                        # print status
                        if load_hdfs.returncode == 0 :
                            print('First version created: version0.avro file was created in /persistent/'+country+'/'+folderpath+'/'+year+'/')
                        else: 
                            print('An error occured, upload not performed')
                        # delete the file in local
                        os.remove(path+initial_name)

                    # if there is a persistent file (e.g. a previous version)
                    if all_versions != '':
                        # get a list with all the versions
                        all_versions = re.findall('/.*/(.*)\r', all_versions)
                        # get the most recent version
                        most_recent_version = initial_name
                        for version in all_versions:
                            if re.search('(\d*)(?=.avro)', version).group(1) > re.search('(\d*)(?=.avro)', most_recent_version).group(1):
                                most_recent_version = version
                        # retrieve that version's file from hadoop
                        old_avro = subprocess.run(['hadoop', 'fs', '-get', '/persistent/'+country+'/'+folderpath+'/'+year+'/'+most_recent_version, path], capture_output=True, shell=True)
                        # get old records (this step can be omited if we only append the new records into the file directly)
                        avro_records = []
                        reader = DataFileReader(open(path+most_recent_version, "rb"), DatumReader())
                        for record in reader:
                            avro_records.append(record)
                        # get old schema
                        schema = json.loads(reader.schema)
                        reader.close()
                        # modify schema
                        schema["doc"] = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
                        schema = json.dumps(schema)
                        schema = avro.schema.parse(schema)
                        # delete the file in local
                        os.remove(path+most_recent_version)
                        # convert the temporal file to AVRO
                        # - first retrieve the data and convert it into a dataframe
                        data = subprocess.run('hadoop fs -cat /temporal/'+country+'/'+folderpath+'/'+filename, capture_output=True, shell=True, encoding="latin-1").stdout
                        data = pd.read_csv(StringIO(data))
                        # - mutate the records to AVRO format
                        new_records = data.to_dict('records')
                        for dicts in new_records:
                            # ensure all the data are strings
                            for keys in dicts:
                                dicts[keys] = str(dicts[keys])
                        # append the new records into the old ones
                        for record in new_records:
                            avro_records.append(record)
                        # define name for the AVRO file to upload
                        final_name = "version"+str(int(re.search('(\d*)(?=.avro)', most_recent_version).group(1))+1)+'.avro'
                        # create the AVRO file with the data in the local system
                        writer = DataFileWriter(open(path+final_name, "wb"), DatumWriter(), schema)
                        # append each record into the file
                        for record in avro_records:
                            writer.append(record)
                        writer.close()
                        # after creating the AVRO file, upload it into HDFS
                        load_hdfs = subprocess.run(['hadoop', 'fs', '-put', path+final_name, '/persistent/'+country+'/'+folderpath+'/'+year], capture_output=True, shell=True)
                        # print status
                        if load_hdfs.returncode == 0 :
                            print('Update performed: '+final_name+' file was created in /persistent/'+country+'/'+folderpath+'/'+year+'/')
                            # HERE TE CODE TO DELETE THE FILE IN THE TEMPORAL LANDING ZONE AFTER IT IS PASSED TO THE PERSISTENT
                        else: 
                            print('An error occured, update not performed')
                        # delete the file in local
                        os.remove(path+final_name)

