## Import libraries

In [2]:
# Libraries for data manipulation and analysis
import pandas as pd  # Handle structured data
import numpy  # Numerical computations

# Libraries for web requests and parsing
import requests  # HTTP requests (e.g., APIs)
import feedparser  # Parse RSS/Atom feeds

# Libraries for database and time handling
import datetime  # Work with dates and times
import psycopg2  # Interact with PostgreSQL databases

from dotenv import load_dotenv
import os

## Enhance the dataframe visibility

In [3]:
# Configure pandas to improve DataFrame display
pd.set_option('display.max_rows', None)  # Show all rows in output
pd.set_option('display.max_columns', None)  # Show all columns in output
pd.set_option('display.width', 1000)  # Set the maximum display width
pd.set_option('display.colheader_justify', 'left')  # Align column headers to the left


## PART 1: Extraction & Loading

Part 1: Extract & Load
Extract data from the English Wikipedia API using Python and load it into a simple database
of your choice (e.g. Postgres, DuckDB). Choose a local setup and make sure to document
how to reproduce it. Pick a database supported by dbt.
1. Extract recent changes made on 31 October 2024 and all pages included in those
changes.
2. Load this data into your database; think about what data objects you create and
what your pipeline looks like.
3. Pay attention to readability, reusability and maintainability of your code. Document
any classes, methods or scripts. Also, consider performance.
Tip: use a query on the recent changes list to retrieve the data.

# Extraction

In [6]:

def fetch_recent_changes(start_date):
    """
    Fetches recent changes from the MediaWiki API in JSON format and processes the response.

    Args:
        start_date (str): The starting date in ISO 8601 format (e.g., "2024-10-31T00:00:00Z").

    Returns:
        DataFrame: A dataset of recent changes with adjusted titles.
    """
    # Base URL for the MediaWiki API
    API_URL = "https://en.wikipedia.org/w/api.php"

    # Define parameters for the API request
    params = {
        "action": "query",  # API action type
        "list": "recentchanges",  # Fetch the list of recent changes
        "rcstart": start_date,  # Start date for recent changes
        "rclimit": "max",  # Maximum number of results allowed by the API(by default 500)
        "rcprop": "title|timestamp|user|userid|comment",  # Properties to retrieve for each change
        "format": "json"  # Response format
    }

    # Send a GET request to the API
    response = requests.get(API_URL, params=params)
    # Raise an HTTPError if the response contains an HTTP error status code
    response.raise_for_status()

    # Print the request URL for debugging purposes
    print("Request URL:", response.url)

    # Extract the relevant data from the JSON response
    data = response.json().get('query', {}).get('recentchanges', [])
    
    # Process titles to remove prefixes before the first colon (:)
    for change in data:
        if 'title' in change and ":" in change['title']:
            change['title'] = change['title'].split(":", 1)[-1]

    return pd.DataFrame(data)

# Example usage
feed = fetch_recent_changes("2024-11-01T00:00:00Z")
feed.head()  # Output the processed feed



Request URL: https://en.wikipedia.org/w/api.php?action=query&list=recentchanges&rcstart=2024-11-01T00%3A00%3A00Z&rclimit=max&rcprop=title%7Ctimestamp%7Cuser%7Cuserid%7Ccomment&format=json


Unnamed: 0,type,ns,title,user,userid,timestamp,comment,anon
0,edit,0,Agatha All Along (miniseries),Rtkat3,275202,2024-11-01T00:00:00Z,,
1,categorize,14,Units and formations of the Union Army from De...,JJMC89 bot III,35936988,2024-11-01T00:00:00Z,[[:Ahl's Heavy Artillery Company]] removed fro...,
2,categorize,14,Units and formations of the Union army from De...,JJMC89 bot III,35936988,2024-11-01T00:00:00Z,[[:Ahl's Heavy Artillery Company]] added to ca...,
3,edit,0,Ahl's Heavy Artillery Company,JJMC89 bot III,35936988,2024-11-01T00:00:00Z,Moving [[:Category:Units and formations of the...,
4,edit,0,Brad Pitt,GreenC bot,27823944,2024-10-31T23:59:59Z,Move 1 url. [[User:GreenC/WaybackMedic_2.5|Way...,


# Loading

You can select any database you prefer. For this walkthrough, we chose PostgreSQL, which is compatible with dbt for the next steps.

In [10]:
# Database connection parameters
# Load environment variables from a .env file
load_dotenv()
# Database connection parameters
db_params = {
    "dbname": os.getenv("DB_NAME"),  # Database name (replace with your database's name)
    "user": os.getenv("DB_USER"),    # PostgreSQL username (replace with your username)
    "password": os.getenv("POSTGRES_PASSWORD"),    # PostgreSQL password (replace with your password)
    "host": "localhost",   # Host address (e.g., 'localhost' or a specific IP)
    "port": 5432           # Port number (default for PostgreSQL is 5432)
}


In [11]:
def load_to_postgres(df, table_name, db_params):
    """
    Loads data from a DataFrame into a PostgreSQL table.

    Args:
        df (pd.DataFrame): The DataFrame containing the data to be loaded.
        table_name (str): The name of the PostgreSQL table to load data into.
        db_params (dict): A dictionary of PostgreSQL connection parameters.
    """
    try:
        # Establish connection to PostgreSQL
        with psycopg2.connect(**db_params) as conn:
            with conn.cursor() as cursor:
                # Dynamically generate column definitions
                columns = ", ".join([
                    f'"{col}" TEXT' if df[col].dtype == 'object' else
                    f'"{col}" TIMESTAMP' if col == 'timestamp' else
                    f'"{col}" INTEGER' if df[col].dtype == 'int64' else
                    f'"{col}" REAL' if df[col].dtype == 'float64' else
                    f'"{col}" VARCHAR'
                    for col in df.columns
                ])
                
                # Create table dynamically
                cursor.execute(f"""
                    CREATE TABLE IF NOT EXISTS public.{table_name} (
                        {columns}
                    );
                """)

                # Generate placeholders for SQL INSERT query
                insert_query = f"""
                    INSERT INTO public.{table_name} ({", ".join([f'"{col}"' for col in df.columns])}) 
                    VALUES ({", ".join(["%s"] * len(df.columns))})
                """

                # Insert rows in a batch process
                cursor.executemany(
                    insert_query,
                    df.where(pd.notnull(df), None).values.tolist()  # Replace NaNs with None for SQL compatibility
                )
                print(f"Data loaded into table '{table_name}' successfully!")

    except Exception as e:
        # Handle any errors that occur during the process
        print(f"Error: {e}")



In [13]:

load_to_postgres(feed, "wikipedia_api1", db_params)

Data loaded into table 'wikipedia_api1' successfully!


# PART2: Transformation with dbt AND Git implementation

from dotenv import load_dotenv
import os

# Load environment variables from .env
load_dotenv()

# Print the environment variable (optional for debugging)
print(os.getenv('DB_USER'))
