# Importing Libraries

In this section, we import the necessary libraries required for data processing, API requests, multithreading, and logging. These libraries include:

- `concurrent.futures` for managing threads.
- `datetime` for handling date and time operations.
- `pandas` for data manipulation and analysis.
- `requests` for making HTTP requests.
- `dotenv` for loading environment variables from `.env` files.
- `json` for working with JSON data.
- `os` for interacting with the operating system.
- `time` for time-related functions.
- `logging` for implementing logging functionality.


In [2]:
# Standard library imports
import os
import time
import json
import logging
from datetime import datetime
# Third-party library imports
import pandas as pd
import requests
from concurrent.futures import ThreadPoolExecutor
import dotenv  # For loading environment variables

# Configuring Logging

In this section, we set up the logging configuration to track the application's execution. This includes:

- Creating a directory for storing log files.
- Generating a log file with a timestamped filename.
- Configuring both file and console handlers for logging.
- Formatting log messages with timestamps and log levels.
- Ensuring no duplicate log entries are propagated.


In [3]:
# Create a directory for logs if it doesn't exist
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)

# Generate a log filename with a timestamp
log_filename = f"{log_dir}/scraping_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

# Configure the logging settings
logger = logging.getLogger(__name__)  # Use __name__ for modular logging
logger.setLevel(logging.INFO)

# File handler for logging to a file
file_handler = logging.FileHandler(log_filename, mode='w')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('[%(asctime)s] %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

# Console handler for logging to the console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('[%(asctime)s] %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

# Avoid duplicate log entries
logger.propagate = False

# Loading Access Tokens and Saving Scraped Repository Data

This section explains the workflow for loading access tokens, scraping GitHub repository data, and saving the data into a CSV file. Below is a detailed breakdown of the steps:

1. **Load Access Tokens**:

   - The `load_access_tokens` function retrieves the required access tokens from a `.env` file using the `dotenv` library.
   - These tokens are essential for authenticating API requests to GitHub.

2. **Scrape GitHub Repositories**:

   - The `scrape_github_repositories` function performs the task of fetching repository data from GitHub based on predefined search queries.
   - It handles pagination, rate-limiting, and error logging to ensure robust data collection.

3. **Save Repository Data to CSV**:
   - The `save_repositories_to_csv` function processes the scraped repository data and saves it into a CSV file.
   - This file can be used for further analysis or reporting.

These steps ensure a streamlined process for collecting and storing GitHub repository data efficiently.


In [4]:
def load_access_tokens():
    """
    Load access tokens from the .env file using the dotenv library.

    Returns:
        tuple: A tuple containing access_token_1 and access_token_2.

    Raises:
        ValueError: If one or both access tokens are missing in the .env file.
    """
    dotenv.load_dotenv()  # Load environment variables from the .env file
    access_token_1 = os.getenv('token_1')
    access_token_2 = os.getenv('token_2')

    # Check if both tokens are loaded successfully
    if not access_token_1 or not access_token_2:
        logger.error("One or both access tokens are missing in the .env file.")
        raise ValueError("One or both access tokens are missing in the .env file.")

    logger.info("Access tokens loaded successfully.")
    return access_token_1, access_token_2

In [5]:
def scrape_github_repositories(
    search_queries: list[str],
    access_token: str,
    pages: int = 10
) -> list[dict]:
    """
    Scrape GitHub repositories based on search queries.

    Args:
        search_queries (list[str]): List of search queries.
        access_token (str): GitHub access token for authentication.
        pages (int): Number of pages to scrape per query (default is 10).

    Returns:
        list[dict]: List of scraped repositories.
    """
    repositories_list = []
    headers = {'Authorization': f'token {access_token}'}

    for query in search_queries:
        logger.info(f'Starting to scrape repositories for query: "{query}"...')
        for page in range(1, pages + 1):
            payload = {'q': query, 'per_page': 100, 'page': page}
            url = "https://api.github.com/search/repositories"

            try:
                response = requests.get(url, headers=headers, params=payload)
                response.raise_for_status()  # Raise an exception for HTTP errors

                items = response.json().get('items', [])
                if items:
                    repositories_list.extend(items)
                    logger.info(f'Page {page} done! Scraped {len(items)} repositories.')
                else:
                    logger.info(f'No repositories found for query: "{query}" on page {page}.')

            except requests.exceptions.RequestException as e:
                logger.error(f'Error on page {page} for query "{query}": {e}')
                break  # Stop processing this query if an error occurs

            # Handle rate limiting
            remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
            if remaining == 0:
                reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
                sleep_duration = reset_time - time.time() + 1
                logger.info(f'Rate limit exceeded. Sleeping for {sleep_duration:.0f} seconds...')
                time.sleep(max(sleep_duration, 0))

    logger.info('Scraping completed.')
    return repositories_list

In [6]:
def save_repositories_to_csv(
        repositories: list,
        output_file: str ='repositories_data.csv'
        ) -> None:
    """
    Save repository data to a CSV file.

    Args:
        repositories (list[dict]): List of repository data dictionaries.
        output_file (str): Name of the output CSV file (default is 'repositories_data.csv').

    Returns:
        None
    """
    if not repositories:
        logger.warning("No repositories to save. The repositories list is empty.")
        return

    try:
        # Extract relevant fields from each repository
        repository_records = [
            {
                'Repository Name': repo.get('name', 'N/A'),
                'Owner Username': repo.get('owner', {}).get('login', 'N/A'),
                'Owner Profile URL': repo.get('owner', {}).get('html_url', 'N/A'),
                'Owner Type': repo.get('owner', {}).get('type', 'N/A'),
                'Description': repo.get('description', 'N/A'),
                'Created Date': repo.get('created_at', 'N/A'),
                'Updated Date': repo.get('updated_at', 'N/A'),
                'Last Pushed Date': repo.get('pushed_at', 'N/A'),
                'Repository Size (KB)': repo.get('size', 'N/A'),
                'Stars Count': repo.get('stargazers_count', 0),
                'Watchers Count': repo.get('watchers_count', 0),
                'Primary Language': repo.get('language', 'N/A'),
                'Forks Count': repo.get('forks_count', 0),
                'Open Issues Count': repo.get('open_issues_count', 0),
                'Topics': ', '.join(repo.get('topics', [])),
                'Default Branch': repo.get('default_branch', 'N/A')
            }
            for repo in repositories
        ]

        # Convert to DataFrame and save to CSV
        df = pd.DataFrame(repository_records)
        df.to_csv(output_file, index=False, encoding='utf-8')
        logger.info(f"Repository data successfully saved to '{output_file}'.")

    except Exception as e:
        logger.error(f"An error occurred while saving repositories to CSV: {e}")

# Scraped Users and Organizations Data

In this section, we analyze the scraped repository data to extract unique individual users and organizations. The extracted data is then saved into separate CSV files for further analysis or reporting.

Below is an explanation of the subsequent cells:

1. **Analyze Users and Organizations**:

   - The `analyze_users_and_organizations` function reads the repository data from a CSV file.
   - It identifies and separates unique individual users and organizations based on the "Owner Type" field.

2. **Fetch User Details**:

   - The `fetch_user_details` function retrieves detailed information about a specific GitHub user using the GitHub API.

3. **Fetch Organization Details**:

   - The `fetch_organization_data` function retrieves detailed information about a specific GitHub organization using the GitHub API.

4. **Save Users Data to CSV**:

   - The `save_users_data_to_csv` function saves the detailed user data into a CSV file.

5. **Save Organizations Data to CSV**:
   - The `save_organizations_data_to_csv` function saves the detailed organization data into a CSV file.


In [7]:
def analyze_users_and_organizations(
        csv_file: str
        ) -> tuple[list[str], list[str]]:
    """
    Analyze the CSV file to extract unique individual users and organizations.

    Args:
        csv_file (str): Path to the CSV file containing repository data.

    Returns:
        tuple: A tuple containing two lists - unique individual users and unique organizations.
    """
    try:
        # Read the CSV file into a DataFrame
        df = pd.read_csv(csv_file)

        # Extract unique users and organizations
        unique_users = df[df["Owner Type"] == 'User']['Owner Username'].dropna().unique().tolist()
        unique_organizations = df[df["Owner Type"] == 'Organization']['Owner Username'].dropna().unique().tolist()

        # Log the results
        logger.info(f'Total Individual Users: {len(unique_users)}')
        logger.info(f'Total Organizations: {len(unique_organizations)}')

        return unique_users, unique_organizations

    except FileNotFoundError:
        logger.error(f"The file '{csv_file}' was not found.")
        return [], []

    except Exception as e:
        logger.error(f"An error occurred while analyzing the file '{csv_file}': {e}")
        return [], []

In [8]:
def fetch_user_details(
        username: str,
        token: str
        ) -> dict:
    """
    Fetch detailed information about a GitHub user.

    Args:
        username (str): GitHub username.
        token (str): GitHub access token for authentication.

    Returns:
        dict or None: A dictionary containing user details if successful, otherwise None.
    """
    url = f"https://api.github.com/users/{username}"
    headers = {'Authorization': f'token {token}'}

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise an exception for HTTP errors

        user_data = response.json()
        user_info = {
            'Username': user_data.get('login', 'N/A'),
            'User ID': user_data.get('id', 'N/A'),
            'Profile URL': user_data.get('html_url', 'N/A'),
            'Name': user_data.get('name', 'N/A'),
            'Company': user_data.get('company', 'N/A'),
            'Location': user_data.get('location', 'N/A'),
            'Bio': user_data.get('bio', 'N/A'),
            'Blog': user_data.get('blog', 'N/A'),
            'Twitter Username': user_data.get('twitter_username', 'N/A'),
            'Email': user_data.get('email', 'N/A'),
            'Public Repos': user_data.get('public_repos', 0),
            'Public Gists': user_data.get('public_gists', 0),
            'Followers': user_data.get('followers', 0),
            'Following': user_data.get('following', 0),
            'Account Created': user_data.get('created_at', 'N/A'),
            'Last Updated': user_data.get('updated_at', 'N/A')
        }
        logger.info(f"Fetched data for user '{username}' successfully.")
        return user_info

    except requests.exceptions.HTTPError as http_err:
        logger.error(f"HTTP error occurred while fetching data for user '{username}': {http_err}")
    except requests.exceptions.RequestException as req_err:
        logger.error(f"Request exception occurred for user '{username}': {req_err}")
    except Exception as e:
        logger.error(f"An unexpected error occurred for user '{username}': {e}")

    return None


In [9]:
def fetch_organization_data(
        org_username: str,
        token: str
        ) -> dict:
    """
    Fetch detailed information about a GitHub organization.

    Args:
        org_username (str): GitHub organization username.
        token (str): GitHub access token for authentication.

    Returns:
        dict or None: A dictionary containing organization details if successful, otherwise None.
    """
    url = f"https://api.github.com/users/{org_username}"
    headers = {'Authorization': f'token {token}'}

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise an exception for HTTP errors

        org_data = response.json()
        org_info = {
            'Organization Username': org_data.get('login', 'N/A'),
            'Organization ID': org_data.get('id', 'N/A'),
            'Profile URL': org_data.get('html_url', 'N/A'),
            'Full Name': org_data.get('name', 'N/A'),
            'Location': org_data.get('location', 'N/A'),
            'Bio': org_data.get('bio', 'N/A'),
            'Blog URL': org_data.get('blog', 'N/A'),
            'Twitter Username': org_data.get('twitter_username', 'N/A'),
            'Email': org_data.get('email', 'N/A'),
            'Public Repositories': org_data.get('public_repos', 0),
            'Public Gists': org_data.get('public_gists', 0),
            'Followers Count': org_data.get('followers', 0),
            'Following Count': org_data.get('following', 0),
            'Account Created At': org_data.get('created_at', 'N/A'),
            'Last Updated At': org_data.get('updated_at', 'N/A')
        }
        logger.info(f"Fetched data for organization '{org_username}' successfully.")
        return org_info

    except requests.exceptions.HTTPError as http_err:
        logger.error(f"HTTP error occurred while fetching data for organization '{org_username}': {http_err}")
    except requests.exceptions.RequestException as req_err:
        logger.error(f"Request exception occurred for organization '{org_username}': {req_err}")
    except Exception as e:
        logger.error(f"An unexpected error occurred for organization '{org_username}': {e}")

    return None

In [10]:
def save_users_data_to_csv(
        users_data: list[dict],
        file_name: str='users_data.csv'
        ) -> None:
    """
    Save user data to a CSV file.

    Args:
        users_data (list[dict]): List of user data dictionaries.
        file_name (str): Name of the output CSV file (default is 'users_data.csv').

    Returns:
        None
    """
    if not users_data:
        logger.info("No user data to save. The users_data list is empty.")
        return

    try:
        # Convert the list of dictionaries to a DataFrame
        df = pd.DataFrame(users_data)
        df.fillna('N/A', inplace=True)  # Replace NaN values with 'N/A'

        # Save the DataFrame to a CSV file
        df.to_csv(file_name, index=False, encoding='utf-8')
        logger.info(f"User data successfully saved to '{file_name}'.")

    except Exception as e:
        logger.error(f"An error occurred while saving user data to CSV: {e}")

In [11]:
def save_organizations_data_to_csv(
    organizations_data: list[dict],
    file_name: str = 'organizations_data.csv'
    ) -> None:
    """
    Save organization data to a CSV file.

    Args:
    organizations_data (list[dict]): List of organization data dictionaries.
    file_name (str): Name of the output CSV file (default is 'organizations_data.csv').

    Returns:
    None
    """
    if not organizations_data:
        logger.info("No organization data to save. The organizations_data list is empty.")
        return

    try:
        # Convert the list of dictionaries to a DataFrame
        df = pd.DataFrame(organizations_data)
        df.fillna('N/A', inplace=True)  # Replace NaN values with 'N/A'

        # Save the DataFrame to a CSV file
        df.to_csv(file_name, index=False, encoding='utf-8')
        logger.info(f"Organization data successfully saved to '{file_name}'.")

    except Exception as e:
        logger.error(f"An error occurred while saving organization data to CSV: {e}")

# Main Function to Execute the Workflow

The `main` function orchestrates the entire workflow by sequentially calling the previously defined methods. It ensures that the process of loading access tokens, scraping GitHub repositories, analyzing users and organizations, and saving the data is executed in a streamlined manner.

### Workflow Steps:

1. **Load Access Tokens**:

   - The function begins by loading the required GitHub access tokens from the `.env` file using the `load_access_tokens` method.
   - These tokens are essential for authenticating API requests.

2. **Define Search Queries**:

   - A list of predefined search queries is used to scrape repositories related to various domains such as front-end, back-end, data science, etc.

3. **Scrape Repositories**:

   - The `scrape_github_repositories` method fetches repository data from GitHub based on the search queries.
   - The scraped data is saved into a CSV file using the `save_repositories_to_csv` method.

4. **Analyze Users and Organizations**:

   - The `analyze_users_and_organizations` method processes the repository data to extract unique individual users and organizations.

5. **Fetch User Details**:

   - Using multithreading, the `fetch_user_details` method retrieves detailed information about each user.
   - The data is saved into a CSV file using the `save_users_data_to_csv` method.

6. **Fetch Organization Details**:

   - Similarly, the `fetch_organization_data` method retrieves detailed information about each organization using multithreading.
   - The data is saved into a CSV file using the `save_organizations_data_to_csv` method.

7. **Logging and Error Handling**:
   - Throughout the workflow, logging is used to track progress and handle errors gracefully.

This function ensures that all the steps are executed in the correct order, providing a comprehensive solution for scraping and analyzing GitHub repository data.


In [12]:
def main():
    try:
        # Load access tokens
        access_token_1, access_token_2 = load_access_tokens()
        logger.info("Access tokens loaded successfully.")
    except ValueError as e:
        logger.error(f"Error loading access tokens: {e}")
        return

    # Define search queries
    search_queries = [
        'front-end', 'back-end', 'full-stack', 'web-development',
        'mobile-development', 'data-science', 'machine-learning',
        'artificial-intelligence', 'cloud-computing', 'cybersecurity'
    ]

    try:
        # Scrape repositories and save to CSV
        repositories_data = scrape_github_repositories(search_queries, access_token_1, pages=10)
        save_repositories_to_csv(repositories_data, 'repositories_data.csv')
    except Exception as e:
        logger.error(f"Error during repository scraping or saving: {e}")
        return

    try:
        # Analyze users and organizations
        users, organizations = analyze_users_and_organizations('repositories_data.csv')
    except Exception as e:
        logger.error(f"Error analyzing users and organizations: {e}")
        return

    try:
        # Fetch user details using multithreading
        with ThreadPoolExecutor(max_workers=100) as executor:
            future_tasks = [
                executor.submit(fetch_user_details, username, access_token_1 if i < 4000 else access_token_2)
                for i, username in enumerate(users)
            ]
            all_user_details = [future.result() for future in future_tasks if future.result() is not None]
        save_users_data_to_csv(all_user_details, 'users_data.csv')
    except Exception as e:
        logger.error(f"Error fetching or saving user details: {e}")
        return

    try:
        # Fetch organization details using multithreading
        with ThreadPoolExecutor(max_workers=100) as executor:
            future_tasks = [
                executor.submit(fetch_organization_data, org_username, access_token_2)
                for org_username in organizations
            ]
            all_organizations_data = [future.result() for future in future_tasks if future.result() is not None]
        save_organizations_data_to_csv(all_organizations_data, 'organizations_data.csv')
    except Exception as e:
        logger.error(f"Error fetching or saving organization details: {e}")
        return

    logger.info("Data scraping and saving completed successfully.")

# Running the Main Function on `__name__ == "__main__"`

This section ensures that the `main` function is executed only when the script is run directly, and not when it is imported as a module. This is achieved using the `if __name__ == "__main__":` condition, which is a common Python convention for defining the entry point of a program.


In [None]:
if __name__ == "__main__":
    logger.info("Starting the GitHub scraping workflow...")
    main()