In [1]:
import io
import zipfile
import requests
import frontmatter

In [2]:
doc_extensions = {'md', 'mdx'}
code_extensions = {'py', 'sql', 'java', 'ipynb'}

extensions = doc_extensions | code_extensions

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filepath = file_info.filename
        filepath_lower = filepath.lower()

        if filepath_lower.endswith('/'):
            continue

        filename = filepath_lower.split('/')[-1]

        if filename.startswith('.'):
            continue

        ext = filename.split('.')[-1]

        if ext not in extensions:
            continue

        filepath_edited = filepath.split('/', maxsplit=1)[1]

        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                if ext in doc_extensions:
                    post = frontmatter.loads(content)
                    data = post.to_dict()
                    data['filename'] = filepath_edited
                elif ext in code_extensions:
                    data = {
                        'code': True,
                        'content': content,
                        'filename': filepath_edited
                    }

                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue

    zf.close()
    return repository_data

In [3]:
de_zoomcamp_data = read_repo_data('DataTalksClub', 'data-engineering-zoomcamp')

In [4]:
len(de_zoomcamp_data)

192

In [5]:
index = {}

for record in de_zoomcamp_data:
    index[record['filename']] = record

In [6]:
import nbformat
from nbconvert import MarkdownExporter
from nbconvert.preprocessors import ClearOutputPreprocessor

exporter = MarkdownExporter()
exporter.register_preprocessor(ClearOutputPreprocessor(), enabled=True)

def format_notebook_as_md(raw_notebook: str) -> str:
    nb_parsed = nbformat.reads(
        raw_notebook,
        as_version=nbformat.NO_CONVERT,
    )
    md_body, _ = exporter.from_notebook_node(nb_parsed)
    return md_body

In [7]:
def strip_code_fence(text: str) -> str:
    text = text.strip()

    if not text.startswith("```"):
        return text

    lines = text.splitlines()
    lines = lines[1:]

    if lines and lines[-1].strip() == "```":
        lines = lines[:-1]

    return "\n".join(lines)

In [8]:
from openai import OpenAI

openai_client = OpenAI()

In [9]:
def llm(instructions, content, model='gpt-4o-mini'):
    messages = [
        {"role": "system", "content": instructions},
        {"role": "user", "content": content}
    ]

    response = openai_client.responses.create(
        model='gpt-4o-mini',
        input=messages,
    )

    return response.output_text

In [10]:
notebook_editing_instructions = """
You're a professional coding editor.

You are given a Markdown file that was converted from a Jupyter notebook.  
The file already contains code blocks and inline comments.  

Your task:

- Turn it into clear, well-structured documentation.  
- Add section headers (##) where appropriate. Keep sections relatively large (8-10 paragraphs and code blocks)
- Add concise, high-level explanations for each code block.  
- Summarize what the code is doing without being overly verbose.  
- Keep the formatting in Markdown.
- Aim for a balance: clear enough to guide someone new, but not overloaded with detail. 

Output the improved Markdown file with the new documentation.
""".strip()

code_doc_instructions = """
You are given a piece of source code.  

Your task:  
- Analyze the code and produce a clear, high-level description of what it does.  
- If the code defines functions, methods, or classes, describe their purpose and role.  
- If it’s just a script without explicit functions/classes, summarize what the script does step by step at a high level.  
- Add logical sections or headings (##) if needed. Sections must be relatively large (8-10 paragraphs and code blocks)
- Keep explanations concise and clear — avoid unnecessary verbosity.  
- Output the result in Markdown, structured like documentation.  
- Do not rewrite or modify the code itself, only provide descriptive documentation.
""".strip()

In [23]:
# First, open the file and read its content
with open('data-processing-code.ipynb', 'r', encoding='utf-8') as f:
    raw_notebook_content = f.read()

result = llm(notebook_editing_instructions, md_body)
print(result)
#result = llm(system_prompt, md_body)

```markdown
# Kafka Data Producer for NYC Taxi Data

This document describes a Python script that produces NYC taxi trip data to a Kafka topic. The script leverages the `KafkaProducer` class from the `kafka` library and processes data from a CSV file of green taxi trips.

## Step 1: Importing Required Libraries

To start, we import the necessary libraries. The `json` library is utilized for data serialization, while `KafkaProducer` is used to send messages to a Kafka topic.

```python
import json
from kafka import KafkaProducer
```

### Explanation
- `json`: This module helps in converting Python objects into JSON format, making them suitable for transmission.
- `KafkaProducer`: This part of the Kafka library provides the functionality to send records to a Kafka topic.

## Step 2: Defining the JSON Serializer

Next, a function is defined to serialize data into JSON format. This will be necessary for sending messages to Kafka.

```python
def json_serializer(data):
    return json.dumps(

In [13]:
from tqdm.auto import tqdm

In [21]:
ipynb_data = []

for record in de_zoomcamp_data:
    if record.get('code') == True and record['filename'].endswith('.ipynb'):
        ipynb_data.append(record)


print(f'processing {len(ipynb_data)} jupyter notebooks...')

for record in tqdm(ipynb_data):
    md_body = format_notebook_as_md(record['content'])
    new_content = llm(notebook_editing_instructions, md_body)
    new_content = strip_code_fence(new_content)
    record['content'] = new_content
    record['code'] = False

processing 19 jupyter notebooks...


  0%|          | 0/19 [00:00<?, ?it/s]

In [19]:
code_data = []

for record in de_zoomcamp_data:
    if record.get('code') != True:
        continue

    path = record['filename']
    ext = path.split('.')[-1]

    if ext not in code_extensions:
        continue

    if ext == 'ipynb':
        continue

    # print(path)
    code_data.append(record)

print(f'processing {len(code_data)} code files...')

processing 78 code files...


In [24]:
for record in tqdm(code_data):
    code = record['content']

    new_content = llm(code_doc_instructions, code)
    new_content = strip_code_fence(new_content)

    record['content'] = new_content
    record['code'] = False

  0%|          | 0/78 [00:00<?, ?it/s]

In [25]:
import json

In [26]:
!mkdir data

mkdir: cannot create directory ‘data’: File exists


In [27]:
output_file = 'data/de-zoomcamp-processed.json'

with open(output_file, 'w', encoding='utf-8') as f_out:
    json.dump(de_zoomcamp_data, f_out, indent=2)

In [28]:
!head data/de-zoomcamp-processed.json

[
  {
    "content": "## Terraform Overview\n\n[Video](https://www.youtube.com/watch?v=18jIzE41fJ4&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=2)\n\n### Concepts\n\n#### Introduction\n\n1. What is [Terraform](https://www.terraform.io)?\n   * open-source tool by [HashiCorp](https://www.hashicorp.com), used for provisioning infrastructure resources\n   * supports DevOps best practices for change management\n   * Managing configuration files in source control to maintain an ideal provisioning state \n     for testing and production environments\n2. What is IaC?\n   * Infrastructure-as-Code\n   * build, change, and manage your infrastructure in a safe, consistent, and repeatable way \n     by defining resource configurations that you can version, reuse, and share.\n3. Some advantages\n   * Infrastructure lifecycle management\n   * Version control commits\n   * Very useful for stack-based deployments, and with cloud providers such as AWS, GCP, Azure, K8S\u2026\n   * State-based approach t

In [29]:
def sliding_window(seq, size, step):
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        chunk = seq[i:i+size]
        result.append({'start': i, 'chunk': chunk})
        if i + size >= n:
            break

    return result

In [30]:
de_zoomcamp_chunks = []

for doc in de_zoomcamp_data:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    chunks = sliding_window(doc_content, 2000, 1000)
    for chunk in chunks:
        chunk.update(doc_copy)
    de_zoomcamp_chunks.extend(chunks)

In [31]:
len(de_zoomcamp_chunks)

862

In [32]:
de_zoomcamp_chunks[100]

{'start': 3000,
 'chunk': "ile based on the specified service and year.\n   - It builds the full request URL to download the file and retrieves it using the `requests` library. The content is saved locally.\n   \n3. After downloading, the script reads the CSV file into a pandas DataFrame while handling gzip compression.\n   \n4. The data is then converted into a Parquet file, and the original file name is modified accordingly.\n   \n5. Using the `upload_to_gcs` function, the Parquet file is uploaded to GCS at the specified path.\n\n6. It prints messages to inform the user of each step (local file download, Parquet conversion, and upload to GCS).\n\n## Main Script Execution\n\nAt the end of the script, the `web_to_gcs` function is called twice for different years and a specific service ('green'):\n- It processes data for 'green' taxis for the years '2019' and '2020'.\n\n### Optional Execution\n\nThere are additional commented-out calls to `web_to_gcs` that would process data for 'yellow