# ETL Pipeline for Move Analystics 
## A modular, production grade data pipeline built in Python using pandas

This notebook showcases a scalable Extract, Transform, Load (ETL) pipeline designed for analystics workflows in the entertainmenrt domain. It ingests raw CSV/TSC datasets from IMDb, The Numbers and MovieLens, performs column-level cleaning, profit and ROI enrichment and produces analysis ready DataFrames and CSV exports.

I have chose this code structure to reflect best practices in data engineering, including:
- Clean OOP abstraction (DataExtractor, DataTransformer, DataLoader, ETLPipeline)
- Strict typing & docstring standards
- Idempotent, testable transformations
- Modular design allowing quick substitution of data sources (eg. S3, SQL, API)
- Local and remote execution compatibility

In this project, each stage of the pipeline - Extract, Transform, Load and Orchestrate is implemented as a python class. The classes are as follows:
- `DataExtractor`
- `DataTransformer`
- `DataLoader`
- `ETLPipeline`

A class is a blueprint for crearing objects that bundle data (attributes) and behaviour (methods) together. It allows us to model real-world entities as self-contained, reusable components. 

### DataExtractor
The aim of the DataExtractor class is to extract the tablular data from the remote URL's (.csv, .tsv), use robust pandas parsing ad validates file types. It then returns a dictionary of DataFrames keyed by table name


In [0]:
clean_folder='path/clean_folder'
raw_folder = 'path/raw_folder'
transformed_folder = 'path/transformed_folder'

import pandas as pd
import os

class DataExtractor:
    """
    DataExtractor is responsible for extracting data from given URLs (CSV/TSV) and loading them into pandas DataFrames.

    Args:
        tables (dict): A dictionary mapping URLs to table names.

    Methods:
        extract(): Reads each file from the URLs and returns a dictionary of DataFrames keyed by table name.
    """
    def __init__(self, tables: dict):
        """
        Initializes the DataExtractor with a dictionary of table URLs and names.

        Args:
            tables (dict): A dictionary mapping URLs to table names.
        """
        self.tables = tables
    
    def extract(self):
        """
        Initializes the DataExtractor with a dictionary of table URLs and names.

        Args:
            tables (dict): A dictionary mapping URLs to table names.
            """
        data = {}
        for url, table_name in self.tables.items():
            if url.endswith('.csv'):
                data[table_name] = pd.read_csv(url, engine="python")
            elif url.endswith('.tsv'):
                data[table_name] = pd.read_csv(url, delimiter='\t', engine="python")
        return data


### Data Transformer
The aim of the DataTransformer is to clean the numberic columns in our dataframes to remove thw currency symbols and commas. It also coerces types safely with `errors="coerce"` so that for example in the event that we are converting data like strings to numbers, any invalid values are turned into NaN instead of causing an error. In addition it supports configurable per-table cleaning specifications.

In [0]:
class DataTransformer:
    """
    This module provides an ETL pipeline for extracting, cleaning, and saving movie-related datasets from various CSV/TSV URLs.
    Classes:
        - DataExtractor: Downloads and loads data into pandas DataFrames.
        - DataTransformer: Cleans specified columns in the DataFrames.
        - DataLoader: Saves DataFrames to CSV files in specified folders.
        - ETLPipeline: Orchestrates the ETL process using the above components.
    """
    def __init__(self, columns_to_clean: dict):
        """
        Initializes the DataTransformer with a dictionary of columns to clean for each table.
        """
        self.columns_to_clean = columns_to_clean

    def clean(self, data):
        """
        Cleans specified columns across the provided tables.

        For each configured column, it removes '$' and ',' from the column values and converts them to numeric types.
        (invalid values are set to NaN)
        
        Args:
            data (dict): A dictionary of DataFrames keyed by table name.
        Returns:
            dict: The cleaned DataFrames.
        """
        for table_name, columns in self.columns_to_clean.items():
            for col in columns:
                if data[table_name][col].dtype =='object':
                    data[table_name][col] = (
                        data[table_name][col]
                        .str.replace('$', '', regex=False)
                        .str.replace(',', '', regex=False)
                    )
                    data[table_name][col] = pd.to_numeric(data[table_name][col], errors='coerce')
        return data

### DataLoader
This aims to write the Datafranes to CSV in structured folders, ensures the destination folders exist. This can be extenxed to upload to Amazon S3, Google Cloud Storage or SQL. 

In [0]:
class DataLoader:
    """
    DataLoader is responsible for saving DataFrames to CSV files in a specified destination folder.

    Args:
        destination_folder (str): The folder path where the CSV files will be saved.

    Methods:
        load(data): Saves each DataFrame in the data dictionary to a CSV file in the destination folder.
    """
    def __init__(self, destination_folder: str):
        """
        Initializes the DataLoader with the specified destination folder.

        Args:
            destination_folder (str): The folder path where the CSV files will be saved.
        """
        self.destination_folder = destination_folder

    def load(self, data):
        """
        DataLoader is responsible for saving DataFrames to CSV files in a specified destination folder.

        Args:
            destination_folder (str): The folder path where the CSV files will be saved.

        Methods:
            load(data): Saves each DataFrame in the data dictionary to a CSV file in the destination folder.
        """
        os.makedirs(self.destination_folder, exist_ok=True)
        for table_name, df in data.items():
            path = os.path.join(self.destination_folder, f"{table_name}.csv")
            df.to_csv(path, index=False)

### ETLPipeline
The aim of the ETLPipeline class is to orchestrate all of the stages that have been set out in the previous classes

In [0]:

class ETLPipeline:
    """
    ETLPipeline orchestrates the ETL process: extraction, cleaning, and saving of movie-related datasets.

    Args:
        tables (dict): Mapping of URLs to table names.
        columns_to_clean (dict): Columns to clean for each table.
        folders (tuple): Tuple of (raw_folder, clean_folder, transformed_folder) paths.

    Methods:
        run(): Executes the ETL pipeline steps.
    """
    def __init__(self, tables, columns_to_clean, folders):
        """
        Initialize the pipeline and its components.

        Args:
            tables (dict): Mapping of URLs to table names.
            columns_to_clean (dict): Columns to clean for each table.
            folders (tuple): Tuple of (raw_folder, clean_folder, transformed_folder) paths.
        """
        self.extractor = DataExtractor(tables)
        self.transformer = DataTransformer(columns_to_clean)
        self.raw_folder, self.clean_folder, self.transformed_folder = folders

    def run(self):
        """
        Execute extraction and saving raw  data, cleaning, and saving cleaned data, and transforming and saving transformed data.
        """
        print("Extracting data...")
        data = self.extractor.extract()

        print("Saving raw data...")
        DataLoader(self.raw_folder).load(data)

        print("Cleaning data...")
        cleaned = self.transformer.clean(data)
        DataLoader(self.clean_folder).load(cleaned)

        print("Transforming data...")
        # your transform_data() logic can go here
        transformed = cleaned
        DataLoader(self.transformed_folder).load(transformed)

        print("ETL complete!")

In [0]:
# Run the ETL Pipline
if __name__ == "__main__":
    TABLES = {
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/MovieLens_movies.csv": "movies_Id",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/IMDb%20BoxOfficeMojo%20-%20Brands%20(US%20%26%20Canada).tsv": "brands_US_and_Canada",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/IMDb%20BoxOfficeMojo%20-%20Brand_%20Marvel%20Comics.tsv": "brand_marvel_comics",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/The%20Numbers%20-%20Domestic%20Box%20Office%20Daily%20-%20The%20Avengers.tsv": "Domestic_Box_Office_Daily_The_Avengers",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/The%20Numbers%20-%20Domestic%20Box%20Office%20-%20Franchises.tsv": "Domestic_Box_Office_Franchises",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/The%20Numbers%20-%20Domestic%20Box%20Office%20-%20Franchises%20-%20Marvel%20Cinematic%20Universe.tsv": "Domestic_Box_Office_Franchises_Marvel_Cinematic",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/World%20Wide%20Box%20Office%20All%20Time%20Top%201000.tsv": "World_Wide_Box_Office_All_Time_Top_1000",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/IMDb%20BoxOfficeMojo%20-%20Franchises%20(US%20%26%20Canada).tsv": "Franchises_us_and_Canada",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/IMDb%20BoxOfficeMojo%20-%20Franchise_%20top20.tsv": "top_20_for_each_Franchise",
    "https://raw.githubusercontent.com/mansik95/IMDB-Analysis/master/Data/MovieLens_tags.csv": "tags"
    }

    COLUMNS_TO_CLEAN = {
    'Domestic_Box_Office_Franchises': ['Domestic_Box_Office', 'Infl_Adj_Dom_Box_Office', 'Worldwide_Box_Office'],
    'Domestic_Box_Office_Franchises_Marvel_Cinematic': ['Production_Budget', 'Opening_Weekend', 'Domestic_Box_Office', 'Worldwide_Box_Office'],
    'top_20_for_each_Franchise': ['Lifetime_Gross','Opening_Gross','Max_Theaters'],
    'brand_marvel_comics': ['Lifetime_Gross','Max_Theaters','Opening_Gross','Open_Theaters'],
    'Franchises_us_and_Canada': ['Lifetime_Gross','Total_Revenue'],
    'brands_US_and_Canada': ['Total_Gross','Lifetime_Gross'],
    'Domestic_Box_Office_Daily_The_Avengers': ['Gross','Theaters','Per Theater','Total Gross'],
    }

    FOLDERS = (clean_folder,
               raw_folder,
               transformed_folder)
    

pipeline = ETLPipeline(TABLES, COLUMNS_TO_CLEAN, FOLDERS)
pipeline.run()