## Place where I test petl.

API => CSV

In [6]:
import requests
import petl as etl

api_url = 'https://swapi.dev/api/people'
response = requests.get(api_url)

if response.status_code == 200:
    json_data = response.json()['results']
    table = etl.fromdicts(json_data)
    etl.tocsv(table, 'output.csv')
    print("CSV file created successfully.")
else:
    print("Failed to fetch data from the API.")

CSV file created successfully.


API * all_pages => CSV

In [7]:
import requests
import petl as etl

def fetch_all_characters(api_url):
    all_characters = []
    while api_url:
        response = requests.get(api_url)
        if response.status_code == 200:
            json_data = response.json()
            all_characters.extend(json_data['results'])
            api_url = json_data['next']
        else:
            print(f"Failed to fetch data from the API. Status code: {response.status_code}")
            return None
    return all_characters

initial_api_url = 'https://swapi.dev/api/people/'
characters = fetch_all_characters(initial_api_url)

if characters:
    table = etl.fromdicts(characters)
    etl.tocsv(table, 'star_wars_characters_list.csv')
    print("CSV file created successfully.")
else:
    print("No characters fetched from the API.")

CSV file created successfully.


Moving to class, separating checking for status code, added MAX_PAGES to avoid for the code to work indefinietely

In [13]:
import requests
import petl as etl

class SWAPIHandler:
    MAX_PAGES = 10

    def __init__(self):
        self.base_url = 'https://swapi.dev/api/people/'

    def _handle_response(self, response):
        if response.status_code != 200:
            print(f"Failed to fetch data from the API. Status code: {response.status_code}")
            return None
        return response.json()

    def fetch_all_characters(self):
        all_characters = []
        api_url = self.base_url
        page_count = 0
        while api_url and page_count < self.MAX_PAGES:
            response = requests.get(api_url)
            json_data = self._handle_response(response)
            if not json_data:
                return None
            all_characters.extend(json_data['results'])
            api_url = json_data['next']
            page_count += 1
        return all_characters

class TestSWAPIHandler(SWAPIHandler):
    def export_to_csv(self, filename):
        characters = self.fetch_all_characters()
        if characters:
            table = etl.fromdicts(characters)
            etl.tocsv(table, filename)
            print("CSV file created successfully.")
        else:
            print("No characters fetched from the API.")


# Example usage
exporter = TestSWAPIHandler()
exporter.export_to_csv("output.csv")

CSV file created successfully.


In [13]:
"""
Logic behind main points:
1. Data cleaning inside a loop - In the requirements of this task was stated
that I need to keep memory footprint as low as possible. Therefore even tasks
with O(1) complexity are done inside a loop to avoid storing all data in memory,
which increases redundancy.
2. MAX_PAGES - Simple approach to limit the number of pages fetched from the API.
"""

import requests
import petl as etl

class SWAPIHandler:
    MAX_PAGES = 10
    BASE_URL = 'https://swapi.dev/api/people/'
    
    def handle_swapi(self):
        page_count = 0
        api_url = self.BASE_URL
        all_data = []


        while api_url and page_count < self.MAX_PAGES:
            response = requests.get(api_url)

            if response.status_code == 200:
                data = response.json()['results']

            all_data.extend(data)

            api_url = response.json()['next']
            page_count += 1

        table = etl.fromdicts(all_data)
        cleaned_data = self.clean_data(table)
        
        return cleaned_data

    def clean_data(self, data):
        cleaned_data = etl.cutout(
            data, 
            'films', 
            'species', 
            'vehicles', 
            'starships', 
            'created', 
            'edited'
            )

        
        return cleaned_data
    
class TestSWAPIHandler(SWAPIHandler):
    def convert_to_csv(self):
        table = self.handle_swapi()
        csv_buffer = "output.csv"
        etl.tocsv(table, csv_buffer)


exporter = TestSWAPIHandler()
exporter.convert_to_csv()


In [13]:
"""
Logic behind main points:
1. Data cleaning inside a loop - In the requirements of this task was stated
that I need to keep memory footprint as low as possible. Therefore even tasks
with O(1) complexity are done inside a loop to avoid storing all data in memory,
which increases redundancy.
2. MAX_PAGES - Simple approach to limit the number of pages fetched from the API.
"""

import requests
import petl as etl


class SWAPIHandler:
    MAX_PAGES = 10
    BASE_URL = 'https://swapi.dev/api/people/'
    planet_dict : dict[str, str] = {}

    def handle_swapi(self):
        page_count = 0
        api_url = self.BASE_URL
        all_data = []


        while api_url and page_count < self.MAX_PAGES:
            response = requests.get(api_url)

            if response.status_code == 200:
                data = response.json()['results']

            all_data.extend(data)

            api_url = response.json()['next']
            page_count += 1

        table = etl.fromdicts(all_data)
        cleaned_data = self.clean_data(table)

        return cleaned_data

    def clean_data(self, data):
        data = etl.convert(data, 'homeworld', lambda row: self.resolve_homeworld(row))
        data = etl.addfield(data, 'date', lambda row: self.get_left_substring(row['edited'], 'T'))
        data = etl.cutout(
            data, 
            'films', 
            'species', 
            'vehicles', 
            'starships', 
            'created', 
            'edited',
            'url'
            )

        return data
    
    def get_left_substring(self, string: str, to_find: str) -> str:
        index_of_t = string.find(to_find)
        
        if index_of_t != -1:
            return string[:index_of_t]
        else:
            return string
        
    def resolve_homeworld(self, row: str) -> str:
        if row in self.planet_dict:
            row = self.planet_dict[row]
        else:
            if row.startswith('https'):
                key = row
                response = self.get_data_from_api(row)
                if response:
                    row = response['name']
                    self.planet_dict[key] = row
        return row
    
    def get_data_from_api(self, url: str) -> dict:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            return None
    
class TestSWAPIHandler(SWAPIHandler):
    def convert_to_csv(self, filename: str) -> None:
        table = self.handle_swapi()
        csv_buffer = filename
        etl.tocsv(table, csv_buffer)

In [12]:
exporter = TestSWAPIHandler()
exporter.convert_to_csv("output.csv")

<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>
<class 'dict'>


In [92]:
import aiohttp
import asyncio
import pandas as pd

async def fetch_page(session, url: str) -> dict:
    """
    Fetches a page.
    """
    async with session.get(url) as response:
        return await response.json()

async def fetch_all_pages(start_url: str) -> list[dict]:
    """
    Fetches all pages.
    """
    data = []
    url = start_url

    async with aiohttp.ClientSession() as session:
        tasks = []
        while url:
            tasks.append(fetch_page(session, url))
            url = await get_next_url(session, url)

        results = await asyncio.gather(*tasks)
        print("RESULTS: ", type(results))
        for result in results:
            if result:
                data.extend(result['results'])

    return data

async def get_next_url(session, url: str) -> str:
    """
    Retrieves the next URL from the current page.
    """
    async with session.get(url) as response:
        page_data = await response.json()
        return page_data['next']

async def main() -> None:
    start_url = 'https://swapi.dev/api/people/?page=1'
    all_data = await fetch_all_pages(start_url)

    df = pd.DataFrame(all_data)
    df.to_csv('output3.csv', index=False)

if __name__ == "__main__":
    try:
        # For Jupyter and other interactive environments
        import nest_asyncio
        nest_asyncio.apply()

        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except RuntimeError:
        # For standard Python scripts
        asyncio.run(main())

RESULTS:  <class 'list'>


In [8]:

# 2. Make an api call as a function with proper error handling.

import aiohttp
import asyncio
import logging
import petl as etl

class SWAPIHandler:

    def __init__(self):
        self.planet_dict = {}
        
    async def get_data_from_api(self, url, session):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return None

    async def resolve_homeworld(self, session, url: str) -> str:
        if url in self.planet_dict:
            return self.planet_dict[url]
        else:
            if url.startswith('https'):
                response = await self.get_data_from_api(url, session)
                if response:
                    planet_name = response['name']
                    self.planet_dict[url] = planet_name
            return planet_name  

    async def clean_data(self, session, data: list[dict]) -> etl.Table:
        data = etl.fromdicts(data)
        data = etl.convert(
            data, 
            'homeworld', 
            lambda row: self.resolve_homeworld(session, row)
            )
        data = etl.addfield(data, 'date', lambda row: row['edited'].split('T')[0])
        data = etl.cutout(
            data, 
            'films', 
            'species', 
            'vehicles', 
            'starships', 
            'created', 
            'edited',
            'url'
            )
        return data
    
    async def get_data_and_url(self, session, url: str) -> tuple[list[dict], str]:
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                swapi = await response.json()
                data = swapi['results']
                url = swapi['next']

                return data, url
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return [], None
        
    async def fetch_all_pages(self, start_url: str) -> None:
        headers_included: bool = False
        url = start_url

        async with aiohttp.ClientSession() as session:
            while url:
                try:
                    data, url = await self.get_data_and_url(session, url)
                    results = await self.clean_data(session, data)

                    if not headers_included:
                        etl.tocsv(results, 'output2.csv')
                        headers_included = True
                    else:
                        etl.appendcsv(results, 'output2.csv')
                        
                except Exception as e:
                    logging.error(f"Error processing data from {url}: {e}")
                    break

if __name__ == "__main__":
    try:
        # For Jupyter and other interactive environments
        import nest_asyncio
        nest_asyncio.apply()

        loop = asyncio.get_event_loop()
        loop.run_until_complete(SWAPIHandler().fetch_all_pages('https://swapi.dev/api/people/'))
    except RuntimeError:
        # For standard Python scripts
        asyncio.run(SWAPIHandler().fetch_all_pages('https://swapi.dev/api/people/'))

In [9]:
import aiohttp
import asyncio
import logging
import petl as etl

class SWAPIHandler:
    def __init__(self):
        self.planet_dict = {}
        
    async def get_data_from_api(self, url, session):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return None

    async def resolve_homeworld(self, session, url: str) -> str:
        if url in self.planet_dict:
            return self.planet_dict[url]
        else:
            if url.startswith('https'):
                response = await self.get_data_from_api(url, session)
                if response:
                    planet_name = response['name']
                    self.planet_dict[url] = planet_name
                    return planet_name  
            return None

    async def preprocess_homeworlds(self, session, data: list[dict]) -> list[dict]:
        for row in data:
            if 'homeworld' in row and row['homeworld']:
                row['homeworld'] = await self.resolve_homeworld(session, row['homeworld'])
        return data
    
    def clean_data(self, data: list[dict]) -> etl.Table:
        data = etl.fromdicts(data)
        data = etl.addfield(data, 'date', lambda row: row['edited'].split('T')[0])
        data = etl.cutout(
            data, 
            'films', 
            'species', 
            'vehicles', 
            'starships', 
            'created', 
            'edited',
            'url'
            )
        return data
    
    async def get_data_and_url(self, session, url: str) -> tuple[list[dict], str]:
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                swapi = await response.json()
                data = swapi['results']
                url = swapi['next']
                return data, url    
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return [], None
        
    async def fetch_all_pages(self, start_url: str) -> None:
        headers_included: bool = False
        url = start_url

        async with aiohttp.ClientSession() as session:
            while url:
                try:
                    data, url = await self.get_data_and_url(session, url)
                    data = await self.preprocess_homeworlds(session, data)
                    results = self.clean_data(data)

                    if not headers_included:
                        etl.tocsv(results, 'output2.csv')
                        headers_included = True
                    else:
                        etl.appendcsv(results, 'output2.csv')
                        
                except Exception as e:
                    logging.error(f"Error processing data from {url}: {e}")
                    break

if __name__ == "__main__":
    try:
        # For Jupyter and other interactive environments
        import nest_asyncio
        nest_asyncio.apply()

        loop = asyncio.get_event_loop()
        loop.run_until_complete(SWAPIHandler().fetch_all_pages('https://swapi.dev/api/people/'))
    except RuntimeError:
        # For standard Python scripts
        asyncio.run(SWAPIHandler().fetch_all_pages('https://swapi.dev/api/people/'))

In [None]:
class SWAPIHandler:

    def __init__(self):
        self.planet_dict = {}
        
    async def get_data_from_api(self, url, session):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return None

    async def resolve_homeworld(self, session, url: str) -> str:
        if url in self.planet_dict:
            return self.planet_dict[url]
        else:
            planet_name = None
            if url.startswith('https'):
                response = await self.get_data_from_api(url, session)
                if response:
                    planet_name = response['name']
                    self.planet_dict[url] = planet_name
            return planet_name

    async def clean_data(self, session, data: list[dict]) -> etl.Table:
        data = etl.fromdicts(data)
        data = etl.convert(
            data, 
            'homeworld', 
            lambda row: await self.resolve_homeworld(session, row['homeworld'])
            )
        data = etl.addfield(data, 'date', lambda row: row['edited'].split('T')[0])
        data = etl.cutout(
            data, 
            'films', 
            'species', 
            'vehicles', 
            'starships', 
            'created', 
            'edited', 
            'url'
            )
        return data
    
    async def get_data_and_url(self, session, url: str) -> tuple[list[dict], str]:
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                swapi = await response.json()
                data = swapi['results']
                url = swapi['next']

                return data, url
        except aiohttp.ClientResponseError as e:
            logging.error(f"Error fetching data from {url}: {e}")
            return [], None
        
    async def fetch_all_pages(self, start_url: str) -> etl.MemorySource:
        headers_included: bool = False
        url = start_url
        csv_buffer = etl.MemorySource()

        async with aiohttp.ClientSession() as session:
            while url:
                try:
                    data, url = await self.get_data_and_url(session, url)
                    results = await self.clean_data(session, data)

                    if not headers_included:
                        etl.tocsv(results, csv_buffer, encoding='utf-8')
                        headers_included = True
                    else:
                        etl.appendcsv(results, csv_buffer, encoding='utf-8')
                        
                except Exception as e:
                    logging.error(f"Error processing data from {url}: {e}")
                    break
        
        return csv_buffer