In [1]:
import requests
import pandas as pd
import os
from datetime import datetime

import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

In [3]:
import re
archive_dir = 'raw_data_archive'
files = [f for f in os.listdir(archive_dir) if f.startswith("ctg-studies_") and f.endswith(".csv")]

if not files:
    print("No archived files found.")

# Extract timestamps and sort by most recent
files.sort(key=lambda f: time.strptime(re.search(r'_(\d{8}_\d{6})', f).group(1), "%Y%m%d_%H%M%S"), reverse=True)
latest_file = files[0]
print(latest_file)

ctg-studies_20250301_232318.csv


In [None]:
def scrape_ctg():
    # 📌 Set download directory to current folder
    download_dir = os.getcwd()  

    # **Set Chrome options for automatic downloads**
    options = Options()
    options.add_argument("--start-maximized")  # Maximize browser
    options.add_argument("--disable-blink-features=AutomationControlled")  # Reduce bot detection
    prefs = {
        "download.default_directory": download_dir,  # Save downloads in current directory
        "download.prompt_for_download": False,       # No prompt
        "download.directory_upgrade": True,
        "safebrowsing.enabled": True                 # Allow safe downloads
    }
    options.add_experimental_option("prefs", prefs)

    # **Initialize WebDriver**
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)

    # **Open the website**
    driver.get("https://clinicaltrials.gov/search")

    try:
        # 🕒 Wait for button to open modal
        main_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CLASS_NAME, "action-bar-button"))
        )
        main_button.click()
        
        # 🕒 Wait for download button inside the modal
        download_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, "//button[contains(@class, 'usa-button') and contains(@class, 'primary-button')]"))
        )

        # ✅ Scroll into view before clicking
        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", download_button)
        time.sleep(1)  # Allow scrolling time

        # ✅ Click the download button
        download_button.click()
        print("✅ Download started!")

        # 🕒 Wait for the file to be downloaded
        timeout = 180  # Max wait time (seconds)
        start_time = time.time()

        while time.time() - start_time < timeout:
            # List files in the download directory
            files = os.listdir(download_dir)
            
            # Check if the `.crdownload` file still exists
            crdownload_files = [f for f in files if f.endswith(".crdownload")]
            if not crdownload_files:
                # If no `.crdownload` files exist, assume download is complete
                completed_file = [f for f in files if f.endswith(".csv")][0] if [f for f in files if f.endswith(".csv")] else None
                if completed_file:
                    print(f"✅ File downloaded: {completed_file}")
                    break
            
            # Wait and check again
            time.sleep(1)

        if crdownload_files:
            print("❌ Download did not complete within the expected time frame.")

    except Exception as e:
        print("❌ Error:", e)

    # Print title to confirm success
    print("Page Title:", driver.title)

    # Close the driver
    driver.quit()

In [None]:
import requests
from bs4 import BeautifulSoup
import csv

# URL to scrape
def scrape_eudraCT_page(page, writer):
    URL = f"https://www.clinicaltrialsregister.eu/ctr-search/search?query=&page={page}"

    # Send a GET request
    response = requests.get(URL)

    # Parse HTML content using BeautifulSoup
    soup = BeautifulSoup(response.content, "html.parser")
    results_div = soup.find("div", class_="results")
    tables = results_div.find_all("table", class_="result")
    # Loop through each table
    # with open("clinical_trials.csv", "w", newline="", encoding="utf-8") as file:
    #     writer = csv.writer(file)
            
    # Write the header row
    writer.writerow(["Study Identifier", "Sponsor Name", "Study Name", "Medical Condition"])

    # Loop through each table
    for table_index, table in enumerate(tables, start=1):

        # Find all rows (tr) in the table
        rows = table.find_all("tr")

        # Initialize variables to store extracted data
        study_identifier = sponsor_name = study_name = condition = "N/A"

        for row_index, row in enumerate(rows, start=1):
            # Get all table cells (td) within each row
            cells = row.find_all("td")
            row_data = [cell.text.strip() for cell in cells]  # Extract text from each cell
                    
            # Extract relevant data from each row
            if row_index == 1 and row_data:
                study_identifier = row_data[0].split(":")[1].strip()
            if row_index == 2 and row_data:
                sponsor_name = row_data[0].split(":")[1].strip() 
            if row_index == 3 and row_data:
                study_name = row_data[0].split(":")[1].strip() 
            if row_index == 4 and row_data:
                condition = row_data[0].split(":")[1].strip()

        # Write extracted data into the CSV file
        writer.writerow([study_identifier, sponsor_name, study_name, condition])

In [None]:
with open("clinical_trials.csv", "w", newline="", encoding="utf-8") as file:
    writer = csv.writer(file)
    for page in range(1,4):
        scrape_eudraCT_page(page, writer)

In [None]:
def scrape_eudraCT(file_name, start_page, end_page):
    with open(file_name, "w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        for page in range(start_page, end_page+1):
            scrape_eudraCT_page(page, writer)

In [None]:
scrape_eudraCT("clinical_trials.csv",start_page=1, end_page=3)

In [55]:
import psycopg2

# Set up the database connection
conn = psycopg2.connect(
    dbname='miracle_scrap',
    user='edwardch',
    password='123456',
    host='localhost',
    port='5434'  # Default PostgreSQL port
)

cursor = conn.cursor()
table_name = 'us'
df = pd.read_csv("clinical_trials.csv")
# Overwrite: Delete existing data before inserting new data
cursor.execute(f"TRUNCATE TABLE raw.{table_name} RESTART IDENTITY;")
conn.commit()
print(f"Cleared existing data from {table_name}")

# Insert new data
for _, row in df.iterrows():
    cursor.execute(f"""
        INSERT INTO raw.{table_name} (study_identifier, study_title, conditions, sponsor)
        VALUES (%s, %s, %s, %s);
    """, (row['Study Identifier'], row['Study Name'], row['Medical Condition'], row['Sponsor Name']))

conn.commit()
print(f"Inserted {len(df)} records into {table_name}")

# Close connection
cursor.close()
conn.close()

Cleared existing data from us
Inserted 62 records into us


In [None]:
def store_csv(file_name, table_name):
    try:
        # Read CSV into a DataFrame
        df = pd.read_csv(file_name)
        
        # renaming EudraCT columns to match the format of clinicaltrials.gov
        if table_name == 'eu':
            df.columns = ['study_identifier', 'study_title', 'study_url', 'status', 'conditions', 'interventions', 'sponsor', 'collaborators', 'study_type']

        # Connect to PostgreSQL
        conn = psycopg2.connect(
            dbname='miracle_scrap',
            user='edwardch',
            password='123456',
            host='localhost',
            port='5434'  # Default PostgreSQL port
        )
        cursor = conn.cursor()

        # Overwrite: Delete existing data before inserting new data
        cursor.execute(f"TRUNCATE TABLE raw.{table_name} RESTART IDENTITY;")
        conn.commit()
        print(f"Cleared existing data from {table_name}")

        # Insert new data
        for _, row in df.iterrows():
            cursor.execute(f"""
                INSERT INTO raw.{table_name} (study_identifier, study_title, conditions, sponsor)
                VALUES (%s, %s, %s, %s);
            """, (row['study_identifier'], row['study_title'], row['conditions'], row['sponsor']))

        conn.commit()
        print(f"Inserted {len(df)} records into {table_name}")

        # Close connection
        cursor.close()
        conn.close()

    except Exception as e:
        print(f"Error: {e}")

In [59]:
curr = os.getcwd()
download = os.path.join(curr,'download')
print(download)

/Users/edwardch/Desktop/miracle_scraper/download
