In [1]:
import requests
import pandas as pd
import time
import re

# Repository information
repo_owner = "lxp32"
repo_name = "lxp32-cpu"
branch = "develop"
path = "rtl"

# GitHub API URL to get contents of the directory
api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{path}?ref={branch}"

# Send request to GitHub API
response = requests.get(api_url)
contents = response.json()

# Filter for .vhd files
vhd_files = [item for item in contents if item['name'].endswith('.vhd')]

# Initialize list to store file content
files_data = []

def extract_comments(code):
    """Extract comments from VHDL code."""
    # Extract single line comments
    single_line_comments = re.findall(r'--(.+?)(?:\n|$)', code)
    
    # Extract block comments
    block_comments = re.findall(r'/\*(.*?)\*/', code, re.DOTALL)
    
    all_comments = single_line_comments + block_comments
    return [comment.strip() for comment in all_comments if comment.strip()]

def extract_between(text, start_marker, end_marker):
    """Extract text between two markers, inclusive of the markers."""
    start_idx = text.lower().find(start_marker.lower())
    if start_idx == -1:
        return None
    
    end_idx = text.lower().find(end_marker.lower(), start_idx)
    if end_idx == -1:
        return None
    
    return text[start_idx:end_idx + len(end_marker)]

def extract_section(code, start_pattern, end_pattern=';'):
    """Extract a section of code between patterns."""
    start_match = re.search(start_pattern, code, re.IGNORECASE)
    if not start_match:
        return None
    
    start_idx = start_match.end()
    # Find the end pattern after the start match
    code_after_start = code[start_idx:]
    
    # For more complex end patterns (like "end entity" or "end process")
    if ' ' in end_pattern:
        end_match = re.search(end_pattern, code_after_start, re.IGNORECASE)
        if not end_match:
            return None
        end_idx = end_match.start()
    else:
        # For simple end patterns like semicolons
        bracket_count = 0
        end_idx = -1
        for i, char in enumerate(code_after_start):
            if char == '(':
                bracket_count += 1
            elif char == ')':
                bracket_count -= 1
            elif char == end_pattern and bracket_count <= 0:
                end_idx = i
                break
        
        if end_idx == -1:
            return None
    
    return code_after_start[:end_idx].strip()

def extract_architecture_body(code):
    """Extract the architecture body between 'begin' and 'end architecture'."""
    arch_start = code.lower().find("architecture ")
    if arch_start == -1:
        return None
    
    begin_idx = code.lower().find(" begin", arch_start)
    if begin_idx == -1:
        return None
    
    end_arch = re.search(r'end\s+architecture|end\s+\w+', code[begin_idx:].lower())
    if not end_arch:
        return None
    
    end_idx = begin_idx + end_arch.start()
    return code[begin_idx:end_idx].strip()

def extract_concurrent_statements(code):
    """Extract concurrent signal assignments."""
    # This is simplified and may not catch all concurrent statements
    arch_body = extract_architecture_body(code)
    if not arch_body:
        return []
    
    # Split by semicolons and filter out process blocks
    lines = arch_body.split(';')
    concurrent_stmts = []
    
    for line in lines:
        line = line.strip()
        if line and 'process' not in line.lower() and 'begin' not in line.lower():
            concurrent_stmts.append(line)
    
    return concurrent_stmts

def extract_entity_ports(code):
    """Extract and categorize entity ports as inputs, outputs, or bidirectional."""
    port_section = extract_section(code, r'port\s*\(', r'\);')
    if not port_section:
        return [], [], []
    
    # Split into individual port declarations
    port_lines = [p.strip() for p in port_section.split(';')]
    
    inputs = []
    outputs = []
    bidirectional = []
    
    for port in port_lines:
        if not port:
            continue
            
        # Handle multiple signals on the same line
        parts = port.split(':')
        if len(parts) < 2:
            continue
            
        signal_names = [s.strip() for s in parts[0].split(',')]
        direction = parts[1].lower()
        
        for name in signal_names:
            if 'in' in direction and 'out' not in direction:
                inputs.append(f"{name}: {direction}")
            elif 'out' in direction and 'in' not in direction:
                outputs.append(f"{name}: {direction}")
            elif 'inout' in direction or ('in' in direction and 'out' in direction):
                bidirectional.append(f"{name}: {direction}")
    
    return inputs, outputs, bidirectional

def extract_generic_parameters(code):
    """Extract generic parameters if they exist."""
    generic_section = extract_section(code, r'generic\s*\(', r'\);')
    if not generic_section:
        return []
    
    # Split into individual generic declarations
    generic_lines = [g.strip() for g in generic_section.split(';')]
    generics = []
    
    for generic in generic_lines:
        if generic:
            generics.append(generic)
    
    return generics

def extract_processes(code):
    """Extract process blocks and their sensitivity lists."""
    processes = []
    
    # Pattern for process with sensitivity list
    pattern1 = re.compile(r'(\w+\s*:\s*)?process\s*\(([^)]*)\)(.*?)end\s+process', re.DOTALL | re.IGNORECASE)
    
    # Pattern for process without sensitivity list
    pattern2 = re.compile(r'(\w+\s*:\s*)?process\s*(?!\()(.+?)end\s+process', re.DOTALL | re.IGNORECASE)
    
    # Find processes with sensitivity lists
    for match in pattern1.finditer(code):
        process_label = match.group(1).strip(':').strip() if match.group(1) else "Unnamed process"
        sensitivity_list = match.group(2).strip()
        process_body = match.group(3).strip()
        
        # Extract wait statements if they exist
        wait_statements = re.findall(r'wait\s+([^;]+);', process_body, re.IGNORECASE)
        
        # Check for clock-related logic
        is_clocked = bool(re.search(r'rising_edge|falling_edge|clk\s*\'event', process_body, re.IGNORECASE))
        
        # Check for reset logic
        has_reset = bool(re.search(r'reset|rst', process_body, re.IGNORECASE))
        
        process_info = {
            "label": process_label,
            "sensitivity_list": sensitivity_list,
            "body": process_body,
            "wait_statements": wait_statements,
            "is_clocked": is_clocked,
            "has_reset": has_reset
        }
        processes.append(process_info)
    
    # Find processes without sensitivity lists
    for match in pattern2.finditer(code):
        process_label = match.group(1).strip(':').strip() if match.group(1) else "Unnamed process"
        process_body = match.group(2).strip()
        
        # Extract wait statements if they exist
        wait_statements = re.findall(r'wait\s+([^;]+);', process_body, re.IGNORECASE)
        
        # Check for clock-related logic
        is_clocked = bool(re.search(r'rising_edge|falling_edge|clk\s*\'event', process_body, re.IGNORECASE))
        
        # Check for reset logic
        has_reset = bool(re.search(r'reset|rst', process_body, re.IGNORECASE))
        
        process_info = {
            "label": process_label,
            "sensitivity_list": "None (uses wait statements)",
            "body": process_body,
            "wait_statements": wait_statements,
            "is_clocked": is_clocked,
            "has_reset": has_reset
        }
        processes.append(process_info)
    
    return processes

def extract_signals(code):
    """Extract signal declarations and their types."""
    signal_pattern = re.compile(r'signal\s+([^:]+):\s*([^;]+);', re.IGNORECASE)
    signals = []
    
    for match in signal_pattern.finditer(code):
        signal_names = match.group(1).strip()
        signal_type = match.group(2).strip()
        
        # Handle multiple signals on the same line
        for name in [n.strip() for n in signal_names.split(',')]:
            signals.append({"name": name, "type": signal_type})
    
    return signals

def extract_component_declarations(code):
    """Extract component declarations."""
    component_pattern = re.compile(r'component\s+(\w+)(.+?)end\s+component', re.DOTALL | re.IGNORECASE)
    components = []
    
    for match in component_pattern.finditer(code):
        component_name = match.group(1).strip()
        component_body = match.group(2).strip()
        components.append({"name": component_name, "body": component_body})
    
    return components

def extract_component_instantiations(code):
    """Extract component instantiations."""
    # This is a simplified pattern that might need refinement
    inst_pattern = re.compile(r'(\w+)\s*:\s*(?:entity\s+\w+\.)?(\w+)(?:\s+\w+\s+\w+)?(?:\s+generic\s+map\s*\(([^)]+)\))?(?:\s+port\s+map\s*\(([^)]+)\))', re.IGNORECASE)
    instantiations = []
    
    for match in inst_pattern.finditer(code):
        instance_name = match.group(1).strip()
        component_name = match.group(2).strip()
        generic_map = match.group(3).strip() if match.group(3) else None
        port_map = match.group(4).strip() if match.group(4) else None
        
        instantiations.append({
            "instance": instance_name,
            "component": component_name,
            "generic_map": generic_map,
            "port_map": port_map
        })
    
    return instantiations

def identify_fsm(code):
    """Identify if the VHDL code likely implements a Finite State Machine."""
    # Look for state variable declarations
    state_vars = re.findall(r'type\s+\w+\s+is\s*\(([^)]+)\)', code, re.IGNORECASE)
    state_signals = re.findall(r'signal\s+\w+(?:\s*,\s*\w+)*\s*:\s*\w+_state', code, re.IGNORECASE)
    
    # Look for case statements often used in FSMs
    case_statements = re.findall(r'case\s+(\w+)\s+is', code, re.IGNORECASE)
    
    # Look for state transitions
    next_state_assignments = re.findall(r'next_state|state\s*<=', code, re.IGNORECASE)
    
    # If we find evidence of states and transitions, it's likely an FSM
    return bool(state_vars or state_signals) and (case_statements or next_state_assignments)

def generate_vhdl_description(filename, code):
    """
    Generate a comprehensive description of VHDL code that would help an LLM understand its functionality.
    """
    description = f"# VHDL Analysis for '{filename}'\n\n"
    
    # Extract comments to understand the designer's intent
    comments = extract_comments(code)
    if comments:
        description += "## File Comments\n"
        for comment in comments[:10]:  # Limit to first 10 comments to avoid overwhelming
            description += f"- {comment}\n"
        if len(comments) > 10:
            description += f"- Plus {len(comments) - 10} more comments\n"
        description += "\n"
    
    # Extract libraries and packages
    libraries = re.findall(r'library\s+(\w+);', code, re.IGNORECASE)
    uses = re.findall(r'use\s+([^;]+);', code, re.IGNORECASE)
    
    if libraries or uses:
        description += "## Libraries and Packages\n"
        for lib in libraries:
            description += f"- Library: {lib}\n"
        for use in uses:
            description += f"- Use: {use}\n"
        description += "\n"
    
    # Extract entity information
    entity_block = extract_between(code, "entity", "end entity")
    if entity_block:
        entity_name_match = re.search(r'entity\s+(\w+)\s+is', entity_block, re.IGNORECASE)
        if entity_name_match:
            entity_name = entity_name_match.group(1)
            description += f"## Entity: {entity_name}\n\n"
            
            # Extract generic parameters
            generics = extract_generic_parameters(entity_block)
            if generics:
                description += "### Generic Parameters\n"
                for generic in generics:
                    description += f"- {generic}\n"
                description += "\n"
            
            # Extract and categorize ports
            inputs, outputs, bidirectional = extract_entity_ports(entity_block)
            
            if inputs:
                description += "### Input Ports\n"
                for port in inputs:
                    description += f"- {port}\n"
                description += "\n"
            
            if outputs:
                description += "### Output Ports\n"
                for port in outputs:
                    description += f"- {port}\n"
                description += "\n"
            
            if bidirectional:
                description += "### Bidirectional Ports\n"
                for port in bidirectional:
                    description += f"- {port}\n"
                description += "\n"
    
    # Extract architecture information
    arch_block = extract_between(code, "architecture", "end architecture")
    if not arch_block:
        arch_block = extract_between(code, "architecture", "end")  # Some VHDL uses shortened "end"
        
    if arch_block:
        arch_name_match = re.search(r'architecture\s+(\w+)\s+of\s+(\w+)', arch_block, re.IGNORECASE)
        if arch_name_match:
            arch_name = arch_name_match.group(1)
            arch_of = arch_name_match.group(2)
            description += f"## Architecture: {arch_name} of {arch_of}\n\n"
            
            # Extract signals
            signals = extract_signals(arch_block)
            if signals:
                description += "### Signals\n"
                for signal in signals:
                    description += f"- {signal['name']}: {signal['type']}\n"
                description += "\n"
            
            # Extract component declarations
            components = extract_component_declarations(arch_block)
            if components:
                description += "### Component Declarations\n"
                for component in components:
                    description += f"- {component['name']}\n"
                description += "\n"
            
            # Extract component instantiations
            instantiations = extract_component_instantiations(arch_block)
            if instantiations:
                description += "### Component Instantiations\n"
                for inst in instantiations:
                    description += f"- Instance '{inst['instance']}' of component '{inst['component']}'\n"
                description += "\n"
            
            # Extract processes
            processes = extract_processes(arch_block)
            if processes:
                description += "### Processes\n"
                for process in processes:
                    clock_info = " (Clocked)" if process["is_clocked"] else ""
                    reset_info = " (Has Reset)" if process["has_reset"] else ""
                    description += f"- Process '{process['label']}'{clock_info}{reset_info} with sensitivity list: {process['sensitivity_list']}\n"
                    
                    # Add brief description of what the process does
                    if "rising_edge" in process["body"] and "reset" in process["body"].lower():
                        description += "  - Implements synchronous logic with reset\n"
                    elif "rising_edge" in process["body"]:
                        description += "  - Implements synchronous logic\n"
                    elif "case" in process["body"].lower():
                        description += "  - Implements case-based selection logic\n"
                    elif "if" in process["body"].lower():
                        description += "  - Implements conditional logic\n"
                        
                    if process["wait_statements"]:
                        description += f"  - Contains wait statements: {', '.join(process['wait_statements'])}\n"
                
                description += "\n"
            
            # Extract concurrent statements (outside processes)
            concurrent_stmts = extract_concurrent_statements(arch_block)
            if concurrent_stmts:
                description += "### Concurrent Statements\n"
                for stmt in concurrent_stmts[:5]:  # Limit to 5 for brevity
                    description += f"- {stmt}\n"
                if len(concurrent_stmts) > 5:
                    description += f"- Plus {len(concurrent_stmts) - 5} more statements\n"
                description += "\n"
    
    # Higher-level analysis
    description += "## Functional Analysis\n\n"
    
    # Check if it's likely an FSM
    is_fsm = identify_fsm(code)
    if is_fsm:
        description += "- This appears to be a Finite State Machine implementation\n"
    
    # Clock domains
    clock_signals = re.findall(r'\b(\w+clk|\w+clock|\bclk\b|\bclock\b)\b', code, re.IGNORECASE)
    if clock_signals:
        unique_clocks = set(clock_signals)
        description += f"- Contains {len(unique_clocks)} clock domain(s): {', '.join(unique_clocks)}\n"
    
    # Reset signals
    reset_signals = re.findall(r'\b(\w*rst\w*|\w*reset\w*)\b', code, re.IGNORECASE)
    if reset_signals:
        unique_resets = set(reset_signals)
        description += f"- Uses reset signal(s): {', '.join(unique_resets)}\n"
    
    # Memory elements
    if re.search(r'\bram\b|\brom\b|\bmemory\b|\bfifo\b', code, re.IGNORECASE):
        description += "- Implements memory functionality\n"
    
    # ALU/computational functions
    if re.search(r'\balu\b|\badd\b|\bsubtract\b|\bmult\b|\bdivide\b|\bcompare\b', code, re.IGNORECASE):
        description += "- Implements arithmetic/computational functionality\n"
    
    # Control logic
    if re.search(r'\bcontrol\b|\bcontroller\b|\bfsm\b|\bstate\b', code, re.IGNORECASE):
        description += "- Implements control logic\n"
    
    # Interface logic
    if re.search(r'\bi2c\b|\bspi\b|\buart\b|\binterface\b|\bbus\b', code, re.IGNORECASE):
        description += "- Implements interface or communication protocol\n"
    
    # Pipeline stages
    if re.search(r'\bpipeline\b|\bstage\b', code, re.IGNORECASE):
        description += "- May be part of a pipeline architecture\n"
    
    # CPU components
    if re.search(r'\bcpu\b|\bprocessor\b|\bdecode\b|\bexecute\b|\bfetch\b|\bregister file\b', code, re.IGNORECASE):
        description += "- This appears to be a CPU component\n"
        
        if re.search(r'\bdecode\b', code, re.IGNORECASE):
            description += "- Implements instruction decoding functionality\n"
        if re.search(r'\bexecute\b', code, re.IGNORECASE):
            description += "- Implements instruction execution functionality\n"
        if re.search(r'\bfetch\b', code, re.IGNORECASE):
            description += "- Implements instruction fetching functionality\n"
        if re.search(r'\bregister file\b|\bregbank\b', code, re.IGNORECASE):
            description += "- Implements register file/bank functionality\n"
    
    # Special functionality
    if "function" in code.lower():
        function_names = re.findall(r'function\s+(\w+)', code, re.IGNORECASE)
        if function_names:
            description += f"- Defines custom functions: {', '.join(function_names)}\n"
    
    if "procedure" in code.lower():
        procedure_names = re.findall(r'procedure\s+(\w+)', code, re.IGNORECASE)
        if procedure_names:
            description += f"- Defines custom procedures: {', '.join(procedure_names)}\n"
    
    # Size/complexity metrics
    line_count = len(code.split('\n'))
    description += f"\n## Code Metrics\n"
    description += f"- Line count: {line_count}\n"
    description += f"- Process count: {len(processes) if 'processes' in locals() else 'N/A'}\n"
    description += f"- Signal count: {len(signals) if 'signals' in locals() else 'N/A'}\n"
    
    # Summary
    description += "\n## Summary\n"
    
    # Determine module type
    if "top" in filename.lower() or "top" in code.lower():
        description += "- This appears to be a top-level module in the design.\n"
    elif re.search(r'\balu\b', code, re.IGNORECASE):
        description += "- This module implements arithmetic and logic operations.\n"
    elif re.search(r'\bcontroller\b|\bcontrol\b', code, re.IGNORECASE):
        description += "- This module implements control logic.\n"
    elif re.search(r'\bmemory\b|\bram\b|\brom\b', code, re.IGNORECASE):
        description += "- This module implements memory functionality.\n"
    elif re.search(r'\bregister\b|\breg\b|\bff\b', code, re.IGNORECASE) and not re.search(r'\bregister file\b', code, re.IGNORECASE):
        description += "- This module implements register or flip-flop functionality.\n"
    elif re.search(r'\bdecode\b', code, re.IGNORECASE):
        description += "- This module implements decoding functionality.\n"
    elif re.search(r'\bpipeline\b', code, re.IGNORECASE):
        description += "- This module implements a pipeline stage.\n"
    elif re.search(r'\bi2c\b|\bspi\b|\buart\b', code, re.IGNORECASE):
        description += "- This module implements a communication interface.\n"
    else:
        description += "- This module appears to be a general-purpose digital logic implementation.\n"
    
    # Final contextual note for LXP32 CPU
    description += f"\nThis file is part of the LXP32 CPU design, which is likely a 32-bit processor implementation. The file '{filename}' plays a specific role in the overall CPU architecture.\n"
    
    return description

# Fetch content for each .vhd file
for file in vhd_files:
    file_url = file['download_url']
    file_response = requests.get(file_url)
    
    # Check if request was successful
    if file_response.status_code == 200:
        file_content = file_response.text
        description = generate_vhdl_description(file['name'], file_content)
        
        files_data.append({
            'input': description,
            'output': file_content
        })
    else:
        files_data.append({
            'input': f"Error analyzing file {file['name']}",
            'output': f"Error fetching file: {file_response.status_code}"
        })
    
    # Add a small delay to avoid hitting API rate limits
    time.sleep(0.5)

# Create a DataFrame with the results
df = pd.DataFrame(files_data)

# Save to CSV file
df.to_csv('lxp32_vhdl_files', index=False)

# Display the first row of the DataFrame to see an example
print(df.head(1))

# Optionally print some statistics
print(f"\nTotal VHDL files processed: {len(files_data)}")
print(f"Average description length: {df['input'].str.len().mean():.2f} characters")

                                               input  \
0  # VHDL Analysis for 'lxp32_alu.vhd'\n\n## File...   

                                              output  
0  ----------------------------------------------...  

Total VHDL files processed: 20
Average description length: 2059.50 characters


In [6]:
import requests
import re
import csv
from time import sleep

# Konfiguracija
GITHUB_API_URL = "https://api.github.com/repos/lxp32/lxp32-cpu/contents/verify/lxp32/src/platform"
RAW_BASE_URL = "https://raw.githubusercontent.com/lxp32/lxp32-cpu/develop/verify/lxp32/src/platform/"
OUTPUT_CSV = "vhdl_dataset_lxp32.csv"

def get_vhdl_files():
    """Dohvata listu svih VHDL fajlova sa GitHub-a"""
    response = requests.get(GITHUB_API_URL)
    if response.status_code == 200:
        files = response.json()
        return [file["name"] for file in files if file["name"].endswith(".vhd")]
    else:
        print(f"Error fetching files: {response.status_code}")
        return []

def clean_description(desc):
    """Čisti opis - uklanja copyright i dekorativne linije"""
    lines = []
    for line in desc.split('\n'):
        line = line.strip()
        if not line or 'copyright' in line.lower() or line.startswith('---'):
            continue
        if line.startswith('--'):
            line = line[2:].strip()
        lines.append(line)
    return ' '.join(lines).strip()

def extract_content(vhdl_content):
    """Izdvaja opis i kod iz VHDL sadržaja"""
    # Pronađi početak koda (nakon komentara)
    code_start = 0
    comment_block = re.search(r'^-{3,}.*?-{3,}', vhdl_content, re.DOTALL)
    if comment_block:
        code_start = comment_block.end()

    # Ekstrakcija opisa
    description = clean_description(vhdl_content[:code_start]) if code_start > 0 else ""

    # Ekstrakcija koda sa očuvanim formatom
    code = vhdl_content[code_start:].strip()
    # Ukloni linijske komentare ali zadrži nove redove
    code = '\n'.join([line for line in code.split('\n') if not line.strip().startswith('--')])

    return description, code

def main():
    vhdl_files = get_vhdl_files()

    with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL, escapechar='\\')
        writer.writerow(['input', 'output'])  # Header

        for filename in vhdl_files:
            print(f"Processing {filename}...")
            raw_url = RAW_BASE_URL + filename
            response = requests.get(raw_url)

            if response.status_code == 200:
                # Get your existing basic description
                basic_description, original_code = extract_content(response.text)
                
                if not basic_description:
                    basic_description = f"VHDL component {filename.split('.')[0]}"
                
                # Generate the comprehensive description
                detailed_description = generate_vhdl_description(filename, response.text)
                
                # Combine both descriptions
                combined_description = f"{basic_description}\n\n{detailed_description}"
                
                # Save the original code and combined description
                writer.writerow([combined_description, response.text])
                print(f"  Successfully processed {filename}")
            else:
                print(f"  Failed to fetch {filename}")

            sleep(1)  # To avoid rate limiting

    print(f"\nDone! Results saved to {OUTPUT_CSV}")

if __name__ == "__main__":
    main()

Processing coprocessor.vhd...
  Successfully processed coprocessor.vhd
Processing dbus_monitor.vhd...
  Successfully processed dbus_monitor.vhd
Processing generic_dpram.vhd...
  Successfully processed generic_dpram.vhd
Processing ibus_adapter.vhd...
  Successfully processed ibus_adapter.vhd
Processing intercon.vhd...
  Successfully processed intercon.vhd
Processing platform.vhd...
  Successfully processed platform.vhd
Processing program_ram.vhd...
  Successfully processed program_ram.vhd
Processing scrambler.vhd...
  Successfully processed scrambler.vhd
Processing timer.vhd...
  Successfully processed timer.vhd

Done! Results saved to vhdl_dataset_lxp32.csv


In [9]:
import requests
import re
import csv
from time import sleep
from urllib.parse import unquote

# Konfiguracija
GITHUB_API_URL = "https://api.github.com/repos/open-logic/open-logic/contents/src/base/vhdl"
RAW_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/src/base/vhdl/"
DOCS_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/doc/base/"
OUTPUT_CSV = "open_logic_vhdl_dataset.csv"

def get_vhdl_files():
    """Dohvata listu svih VHDL fajlova sa GitHub-a"""
    response = requests.get(GITHUB_API_URL)
    if response.status_code == 200:
        files = response.json()
        return [file["name"] for file in files if file["name"].endswith(".vhd")]
    else:
        print(f"Error fetching files: {response.status_code}")
        return []

def extract_md_description(md_content):
    """Ekstrahira Description dio iz markdown fajla"""
    # Pronađi Description sekciju
    desc_match = re.search(
        r'^##\s*Description\s*$(.*?)(?=^##\s|\Z)',
        md_content,
        re.DOTALL | re.MULTILINE | re.IGNORECASE
    )

    if not desc_match:
        return ""

    description = desc_match.group(1).strip()

    # Ukloni slike i specijalne markdown elemente
    description = re.sub(r'!\[.*?\]\(.*?\)', '', description)
    description = re.sub(r'`.*?`', '', description)

    # Očisti prazne linije i višestruke razmake
    clean_lines = []
    for line in description.split('\n'):
        line = line.strip()
        if line and not line.startswith('#'):
            clean_lines.append(line)

    return ' '.join(clean_lines).strip()

def get_description_from_docs(vhdl_filename):
    """Dohvata opis iz odgovarajućeg .md fajla u dokumentaciji"""
    md_filename = vhdl_filename.replace('.vhd', '.md')
    doc_url = DOCS_BASE_URL + md_filename

    try:
        response = requests.get(doc_url)
        if response.status_code == 200:
            return extract_md_description(response.text)
    except Exception as e:
        print(f"  Error fetching docs: {e}")

    return ""

def extract_vhdl_description(vhdl_content):
    """Izdvaja Description dio iz VHDL header-a"""
    desc_match = re.search(
        r'^-{3,}\s*Description\s*-{3,}\s*(.*?)(?=^-{3,}|\Z)',
        vhdl_content,
        re.DOTALL | re.MULTILINE
    )

    if not desc_match:
        return ""

    description = desc_match.group(1)
    clean_lines = []
    for line in description.split('\n'):
        line = line.strip()
        if line.startswith('--'):
            line = line[2:].strip()
        if line and not line.startswith('Documentation:') and not line.startswith('Note:'):
            clean_lines.append(line)

    return ' '.join(clean_lines).strip()

def extract_content(vhdl_filename, vhdl_content):
    """Glavna funkcija za ekstrakciju opisa i koda"""
    # Prvo pokušaj iz VHDL header-a
    description = extract_vhdl_description(vhdl_content)

    # Ako nema dobrog opisa, pokušaj iz dokumentacije
    if not description or len(description.split()) < 10:  # Ako je opis prekratak
        description = get_description_from_docs(vhdl_filename)

    # Ako i dalje nema opisa, koristi fallback
    if not description:
        description = f"VHDL component {vhdl_filename.split('.')[0]}"

    # Ekstrakcija koda
    code_start = 0
    header_end = re.search(r'^-{3,}\s*Libraries\s*-{3,}', vhdl_content, re.MULTILINE | re.IGNORECASE)
    if header_end:
        code_start = header_end.end()

    code = vhdl_content[code_start:].strip()
    code = '\n'.join([line for line in code.split('\n') if not line.strip().startswith('--')])

    return description, code

def main():
    vhdl_files = get_vhdl_files()

    with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL)
        writer.writerow(['input', 'output'])  # Header

        for filename in vhdl_files:
            print(f"Processing {filename}...")
            raw_url = RAW_BASE_URL + filename
            response = requests.get(raw_url)

            if response.status_code == 200:
                description, code = extract_content(filename, response.text)
                description += f"\n\n{generate_vhdl_description(filename, response.text)}"
                writer.writerow([description, code])
                print(f"  Successfully processed {filename}")
            else:
                print(f"  Failed to fetch {filename}")

            sleep(1)  # Rate limiting

    print(f"\nDone! Results saved to {OUTPUT_CSV}")

if __name__ == "__main__":
    main()

Processing olo_base_arb_prio.vhd...
  Successfully processed olo_base_arb_prio.vhd
Processing olo_base_arb_rr.vhd...
  Successfully processed olo_base_arb_rr.vhd
Processing olo_base_cam.vhd...
  Successfully processed olo_base_cam.vhd
Processing olo_base_cc_bits.vhd...
  Successfully processed olo_base_cc_bits.vhd
Processing olo_base_cc_handshake.vhd...
  Successfully processed olo_base_cc_handshake.vhd
Processing olo_base_cc_n2xn.vhd...
  Successfully processed olo_base_cc_n2xn.vhd
Processing olo_base_cc_pulse.vhd...
  Successfully processed olo_base_cc_pulse.vhd
Processing olo_base_cc_reset.vhd...
  Successfully processed olo_base_cc_reset.vhd
Processing olo_base_cc_simple.vhd...
  Successfully processed olo_base_cc_simple.vhd
Processing olo_base_cc_status.vhd...
  Successfully processed olo_base_cc_status.vhd
Processing olo_base_cc_xn2n.vhd...
  Successfully processed olo_base_cc_xn2n.vhd
Processing olo_base_crc.vhd...
  Successfully processed olo_base_crc.vhd
Processing olo_base_de

In [10]:
GITHUB_API_URL = "https://api.github.com/repos/open-logic/open-logic/contents/src/intf/vhdl"
RAW_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/src/intf/vhdl/"
DOCS_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/doc/intf/"
OUTPUT_CSV = "open_logic_intf_vhdl_dataset.csv"  # Novi naziv CSV fajla
main()

Processing olo_intf_clk_meas.vhd...
  Successfully processed olo_intf_clk_meas.vhd
Processing olo_intf_debounce.vhd...
  Successfully processed olo_intf_debounce.vhd
Processing olo_intf_i2c_master.vhd...
  Successfully processed olo_intf_i2c_master.vhd
Processing olo_intf_spi_master.vhd...
  Successfully processed olo_intf_spi_master.vhd
Processing olo_intf_spi_slave.vhd...
  Successfully processed olo_intf_spi_slave.vhd
Processing olo_intf_sync.vhd...
  Successfully processed olo_intf_sync.vhd
Processing olo_intf_uart.vhd...
  Successfully processed olo_intf_uart.vhd

Done! Results saved to open_logic_intf_vhdl_dataset.csv


In [11]:
GITHUB_API_URL = "https://api.github.com/repos/open-logic/open-logic/contents/src/axi/vhdl"
RAW_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/src/axi/vhdl/"
DOCS_BASE_URL = "https://raw.githubusercontent.com/open-logic/open-logic/main/doc/axi/"
OUTPUT_CSV = "open_logic_axi_vhdl_dataset.csv"
main()

Processing olo_axi_lite_slave.vhd...
  Successfully processed olo_axi_lite_slave.vhd
Processing olo_axi_master_full.vhd...
  Successfully processed olo_axi_master_full.vhd
Processing olo_axi_master_simple.vhd...
  Successfully processed olo_axi_master_simple.vhd
Processing olo_axi_pkg_protocol.vhd...
  Successfully processed olo_axi_pkg_protocol.vhd
Processing olo_axi_pl_stage.vhd...
  Successfully processed olo_axi_pl_stage.vhd

Done! Results saved to open_logic_axi_vhdl_dataset.csv


In [12]:
from datasets import load_dataset

# Login using e.g. `huggingface-cli login` to access this dataset
ds = load_dataset("rtl-llm/vhdl_github")

Repo card metadata block was not found. Setting CardData to empty.


In [14]:
import pandas as pd

train_dataset = ds['train']
first_five = train_dataset[:1000]


rows = []

for content in first_five['content']:
    if content is None:
        continue  # preskoči ako je None

    input_lines = []
    output_lines = []
    in_comment_block = True

    for line in content.splitlines():
        stripped = line.strip()
        if stripped.startswith("--"):
            if not any(keyword in stripped for keyword in ["Copyright", "Author", "Contact"]):
                clean_line = stripped[2:].strip("- ").strip()
                if clean_line:
                    input_lines.append(clean_line)
        else:
            in_comment_block = False
            output_lines.append(stripped)

    input_text = "\n".join(input_lines).strip()
    output_text = "\n".join(output_lines).strip()

    if input_text or output_text:
        rows.append({"input": input_text, "output": output_text})

# Snimi u CSV
df = pd.DataFrame(rows)
df.to_csv("huggingface_ds.csv", index=False)

print("Spremljeno u 'huggingface_ds.csv'")


Spremljeno u 'huggingface_ds.csv'


In [19]:
import pandas as pd
import os

def merge_datasets(file_list, output_filename="vhdl_etf.csv"):
    """
    Spaja više CSV fajlova sa VHDL podacima u jedan dataset
    
    Parameters:
        file_list (list): Lista putanja do CSV fajlova
        output_filename (str): Ime izlaznog fajla
    """
    # Lista za čuvanje svih dataframe-ova
    dataframes = []
    
    # Učitavanje svakog dataseta
    for file_path in file_list:
        try:
            print(f"Učitavanje: {file_path}")
            df = pd.read_csv(file_path, quotechar='"', escapechar='\\')
            print(f"Učitano {len(df)} zapisa iz {file_path}")
            dataframes.append(df)
        except Exception as e:
            print(f"Greška pri učitavanju {file_path}: {e}")
    
    # Spajanje svih dataframe-ova
    if dataframes:
        merged_df = pd.concat(dataframes, ignore_index=True)
        
        # Uklanjanje duplikata ako postoje
        before_dedup = len(merged_df)
        merged_df = merged_df.drop_duplicates()
        after_dedup = len(merged_df)
        
        # Snimanje spojenog dataseta
        merged_df.to_csv(output_filename, index=False, quotechar='"', escapechar='\\')
        
        print(f"\nUspešno spojeno {len(dataframes)} dataseta.")
        print(f"Ukupno zapisa: {before_dedup}")
        if before_dedup > after_dedup:
            print(f"Uklonjeno duplikata: {before_dedup - after_dedup}")
        print(f"Konačni broj zapisa: {after_dedup}")
        print(f"Rezultat sačuvan u: {output_filename}")
        
        return merged_df
    else:
        print("Nema validnih dataseta za spajanje.")
        return None

# Glavni deo koda
if __name__ == "__main__":
    # Lista dataseta koje treba spojiti
    datasets = [
        "open_logic_intf_vhdl_dataset.csv",
        "open_logic_vhdl_dataset.csv",
        "open_logic_axi_vhdl_dataset.csv",
        "vhdl_dataset_lxp32.csv",
        "lxp32_vhdl_files.csv",
    ]
    
    # Provera da li fajlovi postoje
    existing_files = []
    for ds in datasets:
        if os.path.exists(ds):
            existing_files.append(ds)
        else:
            print(f"Upozorenje: Fajl {ds} nije pronađen.")
    
    if existing_files:
        # Spojite datasete
        merged_data = merge_datasets(existing_files)
    else:
        print("Nijedan od navedenih fajlova nije pronađen.")

Učitavanje: open_logic_intf_vhdl_dataset.csv
Učitano 7 zapisa iz open_logic_intf_vhdl_dataset.csv
Učitavanje: open_logic_vhdl_dataset.csv
Učitano 37 zapisa iz open_logic_vhdl_dataset.csv
Učitavanje: open_logic_axi_vhdl_dataset.csv
Učitano 5 zapisa iz open_logic_axi_vhdl_dataset.csv
Učitavanje: vhdl_dataset_lxp32.csv
Učitano 9 zapisa iz vhdl_dataset_lxp32.csv
Učitavanje: lxp32_vhdl_files.csv
Učitano 20 zapisa iz lxp32_vhdl_files.csv

Uspešno spojeno 5 dataseta.
Ukupno zapisa: 78
Konačni broj zapisa: 78
Rezultat sačuvan u: vhdl_etf.csv
