# Introduction to Preliminary Global Extraction, Transformation, and Loading (ETL) Process
In this notebook, we embark on the initial stages of the Extraction, Transformation, and Loading (ETL) process by retrieving data from a client-provided source. This extracted data will form the foundation for subsequent analysis and processing, ultimately guiding us to deliver the final product in alignment with the client's requirements. To streamline this process, we utilize a suite of specialized libraries that enable us to extract, transform, and organize the data efficiently.

Our approach prioritizes clarity and simplicity in the implementation of code. We achieve this by minimizing the number of action cells and focusing on concise, well-documented functions. Where necessary, we provide detailed comments to facilitate understanding and collaboration throughout the ETL process.

The structure of this notebook is modular, taking inspiration from the Model-View-Controller (MVC) design pattern. The sections are organized as follows:

1. **Data Extraction Function**: This section defines the function responsible for extracting data from the client's drive and transferring it to the lakehouse.

2. **Transform Functions**: Here, we define the function responsible for transform data in .parquet files for better work.

3. **Load data**: This part we define the function responsible for load data in parquet format to the lakehouse.

We refer to this systematic approach as the Library-Action-View (LAV) paradigm, reflecting our commitment to efficiency and organization in executing the ETL process.

In [1]:
pip install gdown 

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 3, Finished, Available, Finished)

Collecting gdown
  Downloading gdown-5.2.0-py3-none-any.whl (18 kB)
Installing collected packages: gdown
Successfully installed gdown-5.2.0
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pyarrow

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 4, Finished, Available, Finished)

Note: you may need to restart the kernel to use updated packages.


In [3]:
import io
import os
import sys
import pandas as pd
import ast
import json
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
#import builtin.utils as ut
import gdown

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 5, Finished, Available, Finished)

In [4]:
def download_from_drive_folder(drive_folder_id, main_folder_path):
    # URL folder to download
    url = f"https://drive.google.com/drive/folders/1hVEIP_DpRx-l-vRYlJiT6AGe7oZb1Plg?usp=sharing"
    
    # Download folder content
    gdown.download_folder(url, output=main_folder_path, quiet=False, use_cookies=False)

# Paameters
drive_id = '1ribEbkDnfG4IFtGcbQM0f8t1Mc846Agl'  # ID de tu carpeta en Drive
main_folder_path = '/lakehouse/default/Files/Original'  # Ruta de la carpeta destino

# Call funtion "download_from_drive_folder"
download_from_drive_folder(drive_id, main_folder_path)

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 6, Finished, Available, Finished)

Retrieving folder contents
Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1sAI6dJ92gbKGIrGIfg1kqexcOPwfaoAL
To: /lakehouse/default/Files/Original/Console_sales.xlsx
100%|██████████| 12.7k/12.7k [00:00<00:00, 30.2MB/s]
Downloading...
From: https://drive.google.com/uc?id=1j2U0umBKidNL9AT8MnGLNpq3IqeujGk_
To: /lakehouse/default/Files/Original/Indicadores_del_desarrollo_humano_mundial Banco Mundial.xlsx
100%|██████████| 85.3k/85.3k [00:00<00:00, 4.90MB/s]
Downloading...
From: https://drive.google.com/uc?id=1LSfAB64VsWp2s0vYYD3ma7EP7pEizF5n
To: /lakehouse/default/Files/Original/Juegos en steam.csv
100%|██████████| 5.82M/5.82M [00:00<00:00, 66.1MB/s]
Downloading...
From: https://drive.google.com/uc?id=1AY79WPqiUYTjkYhvJ2NPOUyvjZ7DtZSC
To: /lakehouse/default/Files/Original/steam_games.json
100%|██████████| 38.3M/38.3M [00:00<00:00, 75.2MB/s]
Downloading...
From: https://drive.google.c

Processing file 1sAI6dJ92gbKGIrGIfg1kqexcOPwfaoAL Console_sales.xlsx
Processing file 1j2U0umBKidNL9AT8MnGLNpq3IqeujGk_ Indicadores_del_desarrollo_humano_mundial Banco Mundial.xlsx
Processing file 1LSfAB64VsWp2s0vYYD3ma7EP7pEizF5n Juegos en steam.csv
Processing file 1AY79WPqiUYTjkYhvJ2NPOUyvjZ7DtZSC steam_games.json
Processing file 1WKbJv0mkgX-OeNHN5mJ-h4qC9q1wXrrR user_reviews.json
Processing file 1WHmXVApFOct2DkQ37PUEGpsxJe37RiIb users_items.json
Processing file 1krfoHcqRd0qGv9t4L7MelHStQ6DdIMCk Video Games Sales.csv


In [5]:
def load_files_to_dataframe(main_folder_path):
    '''
    Function to read JSON, CSV, and XLSX files from a directory, process them,
    and return a dictionary of Pandas DataFrames.

    Parameters:
    - main_folder_path (str): The path to the main folder containing subfolders with files.

    Returns:
    - dicc (dict): A dictionary where keys are folder or file names and values are Pandas DataFrames.
    '''
    dicc = {}

    for sub_folder in os.listdir(main_folder_path):
        sub_folder_path = os.path.join(main_folder_path, sub_folder)

        # Process files in subfolders
        if os.path.isdir(sub_folder_path):
            folder_name = sub_folder
            dataframe_list = []

            for file in os.listdir(sub_folder_path):
                file_path = os.path.join(sub_folder_path, file)
                dataframe_aux = read_file(file_path, file)
                if dataframe_aux is not None:
                    dataframe_list.append(dataframe_aux)

            if dataframe_list:
                dataframe_object = pd.concat(dataframe_list, axis=0, ignore_index=True)
                dicc[folder_name] = dataframe_object
                print(f'Data from {folder_name} successfully loaded.')

        # Process files in main folder
        elif sub_folder.endswith(('.json', '.csv', '.xlsx')):
            file_path = sub_folder_path
            file_name = sub_folder.split('.')[0]

            dataframe_aux = read_file(file_path, sub_folder)
            if dataframe_aux is not None:
                dicc[file_name] = dataframe_aux
                print(f'File {file_name} successfully loaded.')

    return dicc


def read_file(file_path, file_name):
    '''
    Helper function to read a file based on its extension and return a DataFrame.
    Tries different strategies for JSON files, reading them fully, line by line with ast.literal_eval,
    or line by line normalizing.

    Parameters:
    - file_path (str): The path to the file.
    - file_name (str): The name of the file.

    Returns:
    - df (pd.DataFrame): A Pandas DataFrame containing the data from the file or None if the file is empty or unsupported.
    '''
    if file_name.endswith('.json'):
        try:
            if os.path.getsize(file_path) > 0:
                return read_generic_json(file_path)
            else:
                print(f'File {file_name} is empty, skipping.')
                return None
        except ValueError as e:
            print(f'Error reading {file_name}: {e}')
            return None
    elif file_name.endswith('.csv'):
        return pd.read_csv(file_path)
    elif file_name.endswith('.xlsx'):
        return pd.read_excel(file_path)
    else:
        print(f'Unsupported file format: {file_name}')
        return None


def read_generic_json(file_path):
    '''
    Function to read JSON files and handle different formats:
    1. Full JSON reading.
    2. Line by line reading using ast.literal_eval.
    3. Line by line normalizing JSON objects.

    Parameters:
    - file_path (str): The path to the JSON file.

    Returns:
    - df (pd.DataFrame): A Pandas DataFrame containing the data from the JSON file.
    '''
    # Try 1: Read as JSON 
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return pd.DataFrame(data)
    
    except json.JSONDecodeError:
        print(f'Failed to load full JSON from {file_path}. Trying line-by-line methods.')

    # Try 2: Read line by line using ast.literal_eval
    rows = []
    try:
        with open(file_path, encoding='utf-8') as f:
            for line in f.readlines():
                rows.append(ast.literal_eval(line))
        return pd.DataFrame(rows)
    
    except (ValueError, SyntaxError):
        print(f'Failed to parse JSON with ast.literal_eval for {file_path}. Trying normalization.')

    # Try  3: Read line by line normalizing JSON
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        json_obj = [json.loads(line) for line in content.split('\n') if line.strip()]
        return pd.json_normalize(json_obj)
    
    except Exception as e:
        print(f'All parsing methods failed for {file_path}. Error: {e}')
        return None

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 7, Finished, Available, Finished)

In [6]:
def convert_column_to_string(dataframe, col):
    '''
    Converts a specific column in the dataframe to string (object dtype).
    
    Parameters:
    - dataframe (pd.DataFrame): The DataFrame containing the column.
    - col (str): The column to convert to string.
    
    Returns:
    - pd.Series: The column converted to string.
    '''
    dataframe[col] = dataframe[col].astype(str)
    return dataframe[col]

def try_save_parquet(dataframe_aux, file_path):
    '''
    Tries to save the DataFrame as a Parquet file. If it fails, only converts problematic columns to string type.
    
    Parameters:
    - dataframe_aux (pd.DataFrame): The DataFrame to be saved.
    - file_path (str): The file path where the Parquet file will be saved.
    
    Returns:
    - None
    '''
    try:
        # Try to convert full  DataFrame to Parquet
        table = pa.Table.from_pandas(dataframe_aux)
        pq.write_table(table, file_path)
        print(f'Dataframe saved successfully at {file_path}')
    except Exception as e:
        print(f"Error saving DataFrame at {file_path}: {e}")
        print("Converting problematic columns to string and retrying...")
        
        # Ir it fails, identify problematic columns
        problematic_columns = []
        for col in dataframe_aux.columns:
            try:
                # Try to convert column by column to parquet
                pa.array(dataframe_aux[col])
            except Exception:
                # when a column fails, mark as problematic column
                problematic_columns.append(col)
        
        # Converto the problematic columns to string
        for col in problematic_columns:
            dataframe_aux[col] = convert_column_to_string(dataframe_aux, col)
            print(f"Column '{col}' converted to string.")

        # Try to save the dataframe again
        try:
            table = pa.Table.from_pandas(dataframe_aux)
            pq.write_table(table, file_path)
            print(f'Dataframe saved successfully after converting problematic columns to string at {file_path}')
        except Exception as final_error:
            print(f"Final error saving DataFrame at {file_path}: {final_error}")

def dataframe_to_parquet(dicc, subfolder_name):
    '''
    Function to save Pandas DataFrames as Parquet files. If the conversion fails, it will convert problematic
    columns to string and retry saving.
    
    Parameters:
    - dicc (dict): A dictionary where keys are folder names and values are Pandas DataFrames.
    - subfolder_name (str): The desired subfolder name to be used in the file path.
    
    Returns:
    - None
    '''
    for key, dataframe_aux in dicc.items():
        # File path
        file_path = f'/lakehouse/default/Files/{subfolder_name}/{key}.parquet'
        
        # Save dataframee
        try_save_parquet(dataframe_aux, file_path)



StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 8, Finished, Available, Finished)

In [7]:
# Load files in dataframes
main_folder_path = '/lakehouse/default/Files/Original' 
dicc = load_files_to_dataframe(main_folder_path)

# Save dataframes as parquet files
subfolder_name = 'Data_Parquet'
dataframe_to_parquet(dicc, subfolder_name)

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 9, Finished, Available, Finished)

Failed to load full JSON from /lakehouse/default/Files/Original/steam_games.json. Trying line-by-line methods.
Failed to parse JSON with ast.literal_eval for /lakehouse/default/Files/Original/steam_games.json. Trying normalization.
File steam_games successfully loaded.
File Indicadores_del_desarrollo_humano_mundial Banco Mundial successfully loaded.
Failed to load full JSON from /lakehouse/default/Files/Original/user_reviews.json. Trying line-by-line methods.
File user_reviews successfully loaded.
Failed to load full JSON from /lakehouse/default/Files/Original/users_items.json. Trying line-by-line methods.
File users_items successfully loaded.
File Juegos en steam successfully loaded.
File Console_sales successfully loaded.
File Video Games Sales successfully loaded.
Error saving DataFrame at /lakehouse/default/Files/Data_Parquet/steam_games.parquet: ("Could not convert 'Free To Play' with type str: tried to convert to double", 'Conversion failed for column price with type object')
Con

In [8]:
def data_summ(df, title=None):
    '''
    Function to provide detailed information about the dtype, null values,
    and outliers for each column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): The DataFrame for which information is to be generated.
    - title (str, optional): Title to be used in the summary. If None, the title will be omitted.

    Returns:
    - df_info (pd.DataFrame): A DataFrame containing information about each column,
                              including data type, non-missing quantity, percentage of
                              missing values, missing quantity, and information about outliers.
    '''
    info_dict = {"Column": [], "Data_type": [], "No_miss_Qty": [], "%Missing": [], "Missing_Qty": []}

    for column in df.columns:
        info_dict["Column"].append(column)
        info_dict["Data_type"].append(df[column].apply(type).unique())
        info_dict["No_miss_Qty"].append(df[column].count())
        info_dict["%Missing"].append(round(df[column].isnull().sum() * 100 / len(df), 2))
        info_dict['Missing_Qty'].append(df[column].isnull().sum())

  
    df_info = pd.DataFrame(info_dict)

    if title:
        print(f"{title} Summary")
        print("\nTotal rows: ", len(df))
        print("\nTotal full null rows: ", df.isna().all(axis=1).sum())

    print(df_info.to_string(index=False))
    print("=====================================")

    return df_info

def data_summ_on_parquet(folder_path):
    '''
    Function to apply data_summ function to each Parquet file in a folder.

    Parameters:
    - folder_path (str): The path to the folder containing Parquet files.

    Returns:
    - summaries (list): A list of DataFrames containing the summary information for each Parquet file.
    '''
    summaries = []

    # Loop through each file in the folder
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)

        # Check if the file is a Parquet file
        if file_name.endswith('.parquet'):
            # Read the Parquet file into a DataFrame
            df = pq.read_table(file_path).to_pandas()

            # Get the title for the DataFrame based on the file name
            title = file_name.replace('.parquet', '')

            # Apply data_summ function to the DataFrame
            summary = data_summ(df, title=title)

            # Append the summary DataFrame to the list
            summaries.append(summary)

    return summaries

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 10, Finished, Available, Finished)

In [9]:
folder_path = '/lakehouse/default/Files/Data_Parquet' 
data_summ_on_parquet(folder_path)

StatementMeta(, ada0f479-457e-4a1b-ba84-2ed58930e116, 11, Finished, Available, Finished)

Juegos en steam Summary

Total rows:  27075

Total full null rows:  0
          Column                           Data_type  No_miss_Qty  %Missing  Missing_Qty
           appid                     [<class 'int'>]        27075      0.00            0
            name                     [<class 'str'>]        27075      0.00            0
    release_date                     [<class 'str'>]        27075      0.00            0
         english                     [<class 'int'>]        27075      0.00            0
       developer [<class 'str'>, <class 'NoneType'>]        27074      0.00            1
       publisher [<class 'str'>, <class 'NoneType'>]        27061      0.05           14
       platforms                     [<class 'str'>]        27075      0.00            0
    required_age                     [<class 'int'>]        27075      0.00            0
      categories                     [<class 'str'>]        27075      0.00            0
          genres                     [<c

[              Column                            Data_type  No_miss_Qty  \
 0              appid                      [<class 'int'>]        27075   
 1               name                      [<class 'str'>]        27075   
 2       release_date                      [<class 'str'>]        27075   
 3            english                      [<class 'int'>]        27075   
 4          developer  [<class 'str'>, <class 'NoneType'>]        27074   
 5          publisher  [<class 'str'>, <class 'NoneType'>]        27061   
 6          platforms                      [<class 'str'>]        27075   
 7       required_age                      [<class 'int'>]        27075   
 8         categories                      [<class 'str'>]        27075   
 9             genres                      [<class 'str'>]        27075   
 10     steamspy_tags                      [<class 'str'>]        27075   
 11      achievements                      [<class 'int'>]        27075   
 12  positive_ratings    