## Functions from src

In [2]:
import argparse
import os
import ast

import pandas as pd
import time
import re
import requests
import nbformat

from collections import OrderedDict

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException



def PARSING_COMPETITION_NOTEBOOKS(COMPETITION_URL, SORT_BY='public score', PYTHON_ONLY=True, excludeNonAccessedDatasources=True, NOTEBOOKS_AMOUNT = 5, config = {}):
    
    columns = ["notebook_name", "notebook_url", "public_score", "private_score", "medal", "upvotes", "views", "run_time_info", "last_updated", "notebook_full_text", "code_text", "markdowns_text", "input_datasources", "python_libraries"]
    df = pd.DataFrame(columns=columns)
    
    start_time = time.time()
    print('PARSING LINKS,  Time = ', 0)
    
    # -------------------------------------------------------------------------------------------------------------------------------------------------------
    # BLOCKING ALL DOWNLOADS
    firefox_options = webdriver.FirefoxOptions()
    firefox_options.set_preference("browser.download.folderList", 2)  # Use the last directory specified for downloads
    firefox_options.set_preference("browser.download.dir", "./downloads")  # Specify the download directory [directory does not exists -> not downloading at all]
    firefox_options.set_preference("browser.download.useDownloadDir", True)  # Use the specified download directory
    firefox_options.set_preference("browser.download.manager.showWhenStarting", False)  # Do not show download manager
    firefox_options.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/octet-stream")  # Prevent automatic download of certain file types
    firefox_options.headless = True  # Run in headless mode to hide browser window
    driver = webdriver.Firefox(options=firefox_options)

    # Load the webpage
    driver.get(COMPETITION_URL)
    
    # # Wait for some time to allow dynamic content to load (you can adjust the time accordingly)
    # time.sleep(2)
    
    # finding COMPETITION ID through the header (banner image)
    WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, "//img[contains(@src, '/competitions/') and contains(@src, '/images/header')]")))
    required_str = driver.find_element(By.XPATH, "//img[contains(@src, '/competitions/') and contains(@src, '/images/header')]").get_attribute('src')
    competition_id = re.search(r'/(\d+)/', required_str).group(1)
    
    
    # FILTERS
    COMPETITION_URL += f'/code?competitionId={competition_id}'
    
    if SORT_BY == 'public score':
        COMPETITION_URL += '&sortBy=scoreDescending'
    elif SORT_BY == 'vote count':
        COMPETITION_URL += '&sortBy=voteCount'
    else:
        COMPETITION_URL += '&sortBy=commentCount'
        
    if PYTHON_ONLY: COMPETITION_URL += '&language=Python'
    if excludeNonAccessedDatasources: COMPETITION_URL += '&excludeNonAccessedDatasources=true'
    

    # Load the webpage
    driver.get(COMPETITION_URL)
    # # Wait for some time to allow dynamic content to load (you can adjust the time accordingly)
    # time.sleep(2)
    
    url_count = 0
    MAX_NOTEBOOKS = NOTEBOOKS_AMOUNT

    if config['MY NOTEBOOKS']:
        notebook_links = config['CUSTOM NOTEBOOKS LIST']
    else:
        notebook_links = []
            
        WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, '//*[@id="site-content"]')))
        element = driver.find_element(By.XPATH, '//*[@id="site-content"]')
        for i in range(MAX_NOTEBOOKS//15): 
            driver.execute_script("arguments[0].scrollBy(0, 50000);", element)
            time.sleep(1.5)
        
        # Find all list items within the object list
        WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, "//a[contains(@href, '/code/') and contains(@href, '/comments')]")))
        list_items = driver.find_elements(By.XPATH, "//a[contains(@href, '/code/') and contains(@href, '/comments')]")
        while (url_count < MAX_NOTEBOOKS):
            notebook_links.append(list_items[url_count].get_attribute('href')[:-9])
            url_count += 1
            

        
    print('LINKS DONE,  Time = ', time.time() - start_time)
        
    
    # -------------------------------------------------------------------------------------------------------------------------------------------------------
    
    url_count = 0
    for NOTEBOOK_URL in notebook_links:
        url_count += 1
        print(f'PARSING INFO FOR NOTEBOOK № {url_count}/{MAX_NOTEBOOKS},  Time = ', time.time() - start_time)
        
        # Load the webpage
        driver.get(NOTEBOOK_URL)
        # # Wait for some time to allow dynamic content to load (you can adjust the time accordingly)
        # time.sleep(2)

        # NOTEBOOK NAME
        if config['NAME']:
            try:
                WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, "//div[@wrap='hide']//h1")))
                notebook_name = driver.find_element(By.XPATH, "//div[@wrap='hide']//h1").text
            except NoSuchElementException:
                notebook_name = 'None'
        else:
            notebook_name = 'None'

        # UPDATE LAST TIME
        if config['UPDATE DATE']:
            try:
                update_date = driver.find_element(By.XPATH, "//span[contains(@aria-label, 'ago')]").get_attribute('aria-label')[:-4]
                if update_date == 'a day': update_date = '1 day'
                if update_date == 'a month': update_date = '1 month'
                if update_date == 'a year': update_date = '1 year'
            except NoSuchElementException:
                update_date = 'None'
        else:
            update_date = 'None'

        # UPVOTES
        if config['UPVOTES']:
            try:
                upvotes_amount = int(driver.find_element(By.XPATH, ".//button[contains(@aria-label, 'votes')]").text)
            except NoSuchElementException:
                upvotes_amount = 'None'
        else:
            upvotes_amount = 'None'

        # MEDAL
        if config['MEDAL']:
            try:
                medal = driver.find_element(By.XPATH, "//img[contains(@src, '/static/images/medals/notebooks/') and contains(@src, '.png')]").get_attribute('alt')[:-6]
            except NoSuchElementException:
                medal = 'None'
        else:
            medal = 'None'

        # PUBLIC SCORE
        if config['PUBLIC SCORE']:
            try:
                element = driver.find_element(By.XPATH, f"//*[text()='Public Score']") 
                parent = element.find_element(By.XPATH, "..")
                public_score = float(parent.find_element(By.TAG_NAME, "p").text)
            except NoSuchElementException:
                public_score = 'None'
        else:
            public_score = 'None'

        # PRIVATE SCORE
        if config['PRIVATE SCORE']:
            try:
                element = driver.find_element(By.XPATH, f"//*[text()='Private Score']") 
                parent = element.find_element(By.XPATH, "..")
                private_score = float(parent.find_element(By.TAG_NAME, "p").text)
            except NoSuchElementException:
                private_score = 'None'
        else:
            private_score = 'None'

        # RUN TIME
        if config['RUN TIME']:
            try:
                element = driver.find_element(By.XPATH, f"//*[text()='Run']") 
                parent = element.find_element(By.XPATH, "..")
                run_time_info = parent.find_element(By.TAG_NAME, "p").text
            except NoSuchElementException:
                run_time_info = 'None'
        else:
            run_time_info = 'None'

        # VIEWS
        if config['VIEWS']:
            try:
                text = driver.find_element(By.XPATH, f"//*[text()='views']").text
                views_amount = int(re.search(r'\b(\d+\s*)+ VIEWS$', text).group(0).replace(' VIEWS', '').replace(' ', ''))
            except NoSuchElementException:
                views_amount = 'None'
        else:
            views_amount = 'None'
    
        # -------------------------------------------------------------------------------------------------------------------------------------------------------
        
        time.sleep(1.5)
        
        # click ':'
        inner_html_content = "more_vert"
        WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, f"//*[contains(., '{inner_html_content}')]")))
        download_button = driver.find_element(By.XPATH, f"//*[contains(., '{inner_html_content}')]")   # specifying by 'button name' -> since CSS selector changes every time
        download_button.click()

        # click 'download'
        innter_html_content = "Download code"
        WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, f"//*[contains(., '{inner_html_content}')]")))
        download_button = driver.find_element(By.XPATH, f"//*[contains(., '{inner_html_content}')]")   # specifying by 'button name' -> since CSS selector changes every time
        download_button.click()
        
        # return logs
        logs = driver.execute_script("var performance = window.performance || window.mozPerformance || window.msPerformance || window.webkitPerformance || {}; var network = performance.getEntries() || {}; return network;")

        for log in logs:
            if log['name'].startswith('https://www.kaggleusercontent.com/kf/'):
                kernel_link = log['name']
                break
                
        kernel_id = re.search(r'/(\d+)/', kernel_link).group(1)
        
        # -------------------------------------------------------------------------------------------------------------------------------------------------------
        
        url = NOTEBOOK_URL + "/input"
        input_datasources = []

        # Load the webpage
        driver.get(url)
        # # Wait for some time to allow dynamic content to load (you can adjust the time accordingly)
        # time.sleep(2)

        if config['DATA SOURCES']:
            try:
                WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, "//p[text()='Data Sources']")))

                # Find all list items within the object list [using 'child element' Data Sources -> to find parent element -> that containts list of objects that we need [ul class list, li class element]]
                list_of_elements = driver.find_element(By.XPATH, f"//p[text()='Data Sources']")
                parent_element = list_of_elements.find_element(By.XPATH, "../..")
                ul_element = parent_element.find_element(By.TAG_NAME, "ul")
                list_of_elements = ul_element.find_elements(By.TAG_NAME, "li") 
                for el in list_of_elements:
                    input_datasources.append(el.text[12:])

            except TimeoutException:
                input_datasources.append('None')
        else:
            input_datasources.append('None')
        
        # -------------------------------------------------------------------------------------------------------------------------------------------------------
        
        # URL of the Jupyter notebook
        kernel_link = f'https://www.kaggle.com/kernels/scriptcontent/{kernel_id}/download'

        # Download the notebook
        response = requests.get(kernel_link)
        notebook_content = response.content

        # Parse the notebook content with nbformat
        notebook = nbformat.reads(notebook_content, as_version=4)

        # Extract text from the notebook cells
        all_text = ""
        code_text = ""
        markdown_text = ""

        if config['NOTEBOOK CELLS']:
            for cell in notebook.cells:
                if cell.cell_type == 'code':
                    # Include code cell content
                    all_text += cell.source + '\n'
                    code_text += cell.source + '\n'
                elif cell.cell_type == 'markdown':
                    # Include markdown cell content
                    all_text += cell.source + '\n'
                    markdown_text += cell.source + '\n'
        
        # Forming list of Python libraries
        python_libraries = []
        if config['PYTHON LIBRARIES']:
            lines_of_code = list(filter(bool, code_text.split('\n')))
            for line in lines_of_code:
                if line.startswith('import') or line.startswith('from'):
                    python_libraries.append(line.split(' ')[1])
            python_libraries = list(OrderedDict.fromkeys(python_libraries))
        
        # -------------------------------------------------------------------------------------------------------------------------------------------------------
        
        df.loc[len(df.index)] = [notebook_name, NOTEBOOK_URL, public_score, private_score, medal, upvotes_amount, views_amount, run_time_info, update_date, all_text, code_text, markdown_text, input_datasources, python_libraries]
        
    # -------------------------------------------------------------------------------------------------------------------------------------------------------
    driver.quit()
    end_time = time.time()
    print("Elapsed time:", end_time - start_time, "seconds")
        
    return df

In [3]:
import re
import pkgutil

#######################################################################

def get_valid_module_names():
    # Get a set of valid Python module names
    valid_module_names = set()
    for _, module_name, _ in pkgutil.iter_modules():
        valid_module_names.add(module_name)
    return valid_module_names

#######################################################################

def clean_library_names(library_names):
    cleaned_names = []

    # Get valid Python module names
    valid_module_names = get_valid_module_names()

    # Compile a regex pattern to match standalone words
    pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')

    for name in library_names:
        # Check if the name is a valid Python module name
        if name in valid_module_names:
            cleaned_names.append(name)
        else:
            # Split the string by ".", ":", or ";"
            parts = re.split(r'[.:;]', name)
            cleaned_names.append(parts[0])
            
#             cleaned_parts = []

#             for part in parts:
#                 # Check if the part is a standalone word
#                 match = pattern.match(part)
#                 if match:
#                     cleaned_parts.append(part)

#             # If there's only one part and it's a valid module name, append it
#             if len(cleaned_parts) == 1 and cleaned_parts[0] in valid_module_names:
#                 cleaned_names.append(cleaned_parts[0])

    return cleaned_names


#######################################################################

def clean_sublibrary_names(library_names):
    cleaned_names = []

    # Get valid Python module names
    valid_module_names = get_valid_module_names()

    # Compile a regex pattern to match standalone words
    pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')

    for name in library_names:
        # Split the string by ".", ":", or ";"
        parts = re.split(r'[.:;]', name)
        cleaned_parts = []

        for part in parts:
            # Check if the part is a valid Python module name
            if part in valid_module_names:
                cleaned_parts.append(part)
            else:
                # Check if the part is a standalone word
                match = pattern.match(part)
                if match:
                    cleaned_parts.append(part)

        # Join the cleaned parts back into a string and append it to the cleaned_names list
        cleaned_name = ".".join(cleaned_parts)
        if cleaned_name:
            cleaned_names.append(cleaned_name)

    return cleaned_names


#######################################################################

def count_strings(strings, unique_strings, NO_SUBLIBRARIES):
    if NO_SUBLIBRARIES:
        for idx in range(len(strings)):
            parts = re.split(r'[.:;]', strings[idx])
            strings[idx] = parts[0]
    
    string_dict = {}

    for unique_string in unique_strings:
        string_dict[unique_string] = strings.count(unique_string)

    return string_dict

In [4]:
def Code_parsing(competition_url, SORT_BY, NOTEBOOKS_AMOUNT, CSVs_SAVING_DIR, NO_SUBLIBRARIES, config):
    
    for i in range(8): print("")
    print("")
    print("============== KAGGLE CODE PARSING (Firefox browser ONLY) =================")
    print("SETTINGS")
    print(f"        Competition url        -   {competition_url}")
    print(f"        Sorting by             -   {SORT_BY}")
    print(f"        Amount of notebooks    -   {NOTEBOOKS_AMOUNT}")
    print(f"        Save to directory      -   {CSVs_SAVING_DIR}")
    print(f"        Remove sublibraries    -   {'Yes'*bool(NO_SUBLIBRARIES) + 'No'*(1 - bool(NO_SUBLIBRARIES))}")
    print("")
    
    
    file_name = list(filter(bool, competition_url.split('/')))[-1]
    full_file_name = f"{file_name}_{SORT_BY.replace(' ', '')}_{NOTEBOOKS_AMOUNT}.csv"

    if os.path.exists(CSVs_SAVING_DIR + full_file_name):
        print("Loading existing file")
        df = pd.read_csv(CSVs_SAVING_DIR + full_file_name)

        # transform LISTS columns into LISTS (since saving transforms them into STRINGS)
        df['python_libraries'] = df['python_libraries'].apply(ast.literal_eval)
        df['input_datasources'] = df['input_datasources'].apply(ast.literal_eval)
    else:
        print("Parsing data from competition + Saving")
        print("-----------------------------")

        df = PARSING_COMPETITION_NOTEBOOKS(competition_url, SORT_BY, True, True, NOTEBOOKS_AMOUNT, config)  # COMPETITION_URL / SORT_BY / PYTHON_ONLY / excludeNonAccessedDatasources / NOTEBOOKS_AMOUNT

        df.to_csv(CSVs_SAVING_DIR + full_file_name, index=False)
        
        
    if bool(int(NO_SUBLIBRARIES)):
        # changed a little big --- now 'gemma.config' considered as 'gemma' library (previously it wasn't considered at all)

        all_python_libraries = list(OrderedDict.fromkeys(sum(df['python_libraries'], [])))
        cleaned_all_python_libraries_repeated = clean_library_names(all_python_libraries)
        cleaned_all_python_libraries= list(OrderedDict.fromkeys(cleaned_all_python_libraries_repeated, []))
        print("        TOTAL AMOUNT OF LIBRARIES = ", len(cleaned_all_python_libraries))

    else:
        all_python_libraries = list(OrderedDict.fromkeys(sum(df['python_libraries'], [])))
        cleaned_all_python_libraries = clean_sublibrary_names(all_python_libraries)
        print("        TOTAL AMOUNT OF LIBRARIES + SUB-LIBRARIES = ", len(cleaned_all_python_libraries))
        
    print("")
    print("DONE")
    print("===========================================================================")

## Run

**What to set-up**:
- `url` - competition url
- `my_list` - can be filled with custom notebook URLS 
    - if filled, no need to change `sort_by` and `NOTEBOOK TO PARSE`

In [6]:
# SORT_BY = 'vote count' # 'public score', 'vote count', 'comment count'
# NOTEBOOKS_AMOUNT = 10
# competition_url = "https://www.kaggle.com/competitions/ai-mathematical-olympiad-prize" #"https://www.kaggle.com/competitions/llm-prompt-recovery"
# CSVs_SAVING_DIR = "Kaggle notebooks CSVs/"
# NO_SUBLIBRARIES = True




# provide links to notebooks here
my_list = [
    #'https://www.kaggle.com/code/merckel/autoencoder-and-deep-features',
    'https://www.kaggle.com/code/donniedarko/darktaxi-tripdurationprediction-lb-0-385',
    'https://www.kaggle.com/code/jeffreycbw/nyc-taxi-trip-public-0-37399-private-0-37206',
    'https://www.kaggle.com/code/quentinmonmousseau/ml-workflow-lightgbm-0-37-randomforest-0-39',
    'https://www.kaggle.com/code/donniedarko/darktaxi-tripdurationprediction-lb-0-385',
]    

config = {
    'NAME': True,
    'UPDATE DATE': True,
    'UPVOTES': True,
    'MEDAL': False,
    'PUBLIC SCORE': False,
    'PRIVATE SCORE': False,
    'RUN TIME': False,
    'VIEWS': False, 

    "NOTEBOOKS TO PARSE": 5, 

    'DATA SOURCES': False,          # if any external data needed (not important)
    'NOTEBOOK CELLS': True,         # should always be True
    'PYTHON LIBRARIES': False,      # parse libraries names (required for visuals)
}

url  = 'https://www.kaggle.com/competitions/nyc-taxi-trip-duration'
sort_by = 'vote count'

saving_dir = '_DOWNLOADED NOTEBOOKS/'
os.chdir('C:/_Github repositories')
no_sublibraries = 0

if len(my_list) != 0: 
    print("Custom parsing")

    config['MY NOTEBOOKS'] = True
    config['CUSTOM NOTEBOOKS LIST'] = my_list

    Code_parsing(url, sort_by, len(my_list), saving_dir, no_sublibraries, config)
else:
    print(f"Top {config['NOTEBOOKS TO PARSE']} by {sort_by} parsing")

    config['MY NOTEBOOKS'] = True
    config['CUSTOM NOTEBOOKS LIST'] = my_list

    Code_parsing(url, sort_by, config['NOTEBOOKS TO PARSE'], saving_dir, no_sublibraries, config)


Custom parsing









SETTINGS
        Competition url        -   https://www.kaggle.com/competitions/nyc-taxi-trip-duration
        Sorting by             -   vote count
        Amount of notebooks    -   4
        Save to directory      -   _DOWNLOADED NOTEBOOKS/
        Remove sublibraries    -   No

Parsing data from competition + Saving
-----------------------------
PARSING LINKS,  Time =  0
LINKS DONE,  Time =  15.765558958053589
PARSING INFO FOR NOTEBOOK № 1/4,  Time =  15.765558958053589
PARSING INFO FOR NOTEBOOK № 2/4,  Time =  19.79638123512268
PARSING INFO FOR NOTEBOOK № 3/4,  Time =  23.839372634887695
PARSING INFO FOR NOTEBOOK № 4/4,  Time =  27.90632724761963
Elapsed time: 34.18342685699463 seconds
        TOTAL AMOUNT OF LIBRARIES + SUB-LIBRARIES =  0

DONE


## To txt

### EACH IN SEPARATE .txt

In [65]:
import os
import pandas as pd
import ast

# Base directory
CSVs_SAVING_DIR = 'C:/_Github repositories/_DOWNLOADED NOTEBOOKS/'

file_name = list(filter(bool, url.split('/')))[-1]

# Read CSV
if len(my_list) != 0:
    csv_path = os.path.join(CSVs_SAVING_DIR, f"{file_name}_{sort_by.replace(' ', '')}_{len(my_list)}.csv")
else:   
    csv_path = os.path.join(CSVs_SAVING_DIR, f"{file_name}_{sort_by.replace(' ', '')}_{config['NOTEBOOKS TO PARSE']}.csv")

df = pd.read_csv(csv_path)
df['python_libraries'] = df['python_libraries'].apply(ast.literal_eval)
df['input_datasources'] = df['input_datasources'].apply(ast.literal_eval)

# Create subfolder for this notebook batch
main_folder = os.path.join(CSVs_SAVING_DIR, file_name)
os.makedirs(main_folder, exist_ok=True)

# Create sub-subfolder for individual notebook .txt files
individual_folder = os.path.join(main_folder, 'individual_notebooks')
os.makedirs(individual_folder, exist_ok=True)

# Save each notebook's full text as a separate .txt file
for i, row in df.iterrows():
    filename = f"{row['notebook_name']}.txt"
    safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '.', '_')).rstrip()
    
    txt_path = os.path.join(individual_folder, safe_filename)
    with open(txt_path, 'w', encoding='utf-8') as f:
        f.write(row['notebook_full_text'])

### ALL IN 1 .txt

In [9]:
import os
import pandas as pd
import ast

# Define base directory
CSVs_SAVING_DIR = 'C:/_Github repositories/_DOWNLOADED NOTEBOOKS/'
file_name = list(filter(bool, url.split('/')))[-1]

# Create subfolder for .txt files
subfolder_path = os.path.join(CSVs_SAVING_DIR, file_name)
os.makedirs(subfolder_path, exist_ok=True)

# Read the CSV file
# Read CSV
if len(my_list) != 0:
    csv_path = os.path.join(CSVs_SAVING_DIR, f"{file_name}_{sort_by.replace(' ', '')}_{len(my_list)}.csv")
else:   
    csv_path = os.path.join(CSVs_SAVING_DIR, f"{file_name}_{sort_by.replace(' ', '')}_{config['NOTEBOOKS TO PARSE']}.csv")
df = pd.read_csv(csv_path)

# Convert stringified lists back to actual lists
df['python_libraries'] = df['python_libraries'].apply(ast.literal_eval)
df['input_datasources'] = df['input_datasources'].apply(ast.literal_eval)

# Save all 'notebook_full_text' entries into one combined .txt file inside the subfolder
combined_txt_path = os.path.join(subfolder_path, 'combined_notebooks.txt')
with open(combined_txt_path, 'w', encoding='utf-8') as f:
    for i, row in df.iterrows():
        f.write(f"Notebook: {row['notebook_name']}\n")
        f.write(row['notebook_full_text'])
        f.write("\n\n" + "="*80 + "\n\n")  # Separator between notebooks