In [None]:
import threading
import time
import random
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(message)s')

# 1. Basic Thread Example
class BasicThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        logging.info(f"{self.name} starting")
        time.sleep(random.uniform(0.5, 2))
        logging.info(f"{self.name} finished")

# 2. Thread with Lock (Synchronization)
class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self, thread_name):
        with self.lock:  # Thread-safe operation
            current_value = self.value
            time.sleep(0.1)  # Simulate some work
            self.value = current_value + 1
            logging.info(f"{thread_name} incremented counter to {self.value}")

# 3. Thread with Semaphore (Limited Resources)
class ResourcePool:
    def __init__(self, max_resources):
        self.semaphore = threading.Semaphore(max_resources)

    def use_resource(self, thread_name):
        with self.semaphore:
            logging.info(f"{thread_name} acquired resource")
            time.sleep(1)
            logging.info(f"{thread_name} released resource")

# 4. Producer-Consumer with Queue and Condition
class ProducerConsumer:
    def __init__(self):
        self.queue = Queue(maxsize=5)
        self.condition = threading.Condition()
        self.done = False

    def producer(self):
        for i in range(10):
            with self.condition:
                while self.queue.full():
                    self.condition.wait()
                item = f"Item-{i}"
                self.queue.put(item)
                logging.info(f"Produced {item}")
                self.condition.notify()
            time.sleep(0.5)
        with self.condition:
            self.done = True
            self.condition.notify_all()

    def consumer(self, name):
        while True:
            with self.condition:
                while self.queue.empty() and not self.done:
                    self.condition.wait()
                if self.queue.empty() and self.done:
                    break
                item = self.queue.get()
                logging.info(f"{name} consumed {item}")
                self.condition.notify()
            time.sleep(0.7)
            self.queue.task_done()

# 5. Thread Pool Example
def task(n):
    sleep_time = random.uniform(0.1, 1)
    time.sleep(sleep_time)
    return f"Task {n} completed after {sleep_time:.2f}s"

def main():
    # Basic Threads Demo
    logging.info("Starting Basic Threads Demo")
    threads = [BasicThread(f"Thread-{i}") for i in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    logging.info("Basic Threads Demo Completed\n")

    # Thread Synchronization with Lock Demo
    logging.info("Starting Counter Demo")
    counter = Counter()
    counter_threads = [
        threading.Thread(target=counter.increment, args=(f"Counter-Thread-{i}",))
        for i in range(5)
    ]
    for t in counter_threads:
        t.start()
    for t in counter_threads:
        t.join()
    logging.info(f"Final counter value: {counter.value}\n")

    # Semaphore Demo
    logging.info("Starting Resource Pool Demo")
    pool = ResourcePool(2)  # Only 2 resources available
    pool_threads = [
        threading.Thread(target=pool.use_resource, args=(f"Pool-Thread-{i}",))
        for i in range(5)
    ]
    for t in pool_threads:
        t.start()
    for t in pool_threads:
        t.join()
    logging.info("Resource Pool Demo Completed\n")

    # Producer-Consumer Demo
    logging.info("Starting Producer-Consumer Demo")
    pc = ProducerConsumer()
    producer_thread = threading.Thread(target=pc.producer, name="Producer")
    consumer_threads = [
        threading.Thread(target=pc.consumer, name=f"Consumer-{i}")
        for i in range(2)
    ]

    producer_thread.start()
    for t in consumer_threads:
        t.start()

    producer_thread.join()
    for t in consumer_threads:
        t.join()
    logging.info("Producer-Consumer Demo Completed\n")

    # Thread Pool Demo
    logging.info("Starting Thread Pool Demo")
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(6)]
        for future in futures:
            logging.info(future.result())
    logging.info("Thread Pool Demo Completed")

if __name__ == "__main__":
    main()

Exception in thread Consumer-0:
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
Exception in thread Consumer-1:
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
TypeError: ProducerConsumer.consumer() missing 1 required positional argument: 'name'
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
TypeError: ProducerConsumer.consumer() missing 1 required positional argument: 'name'


KeyboardInterrupt: 

In [None]:
import threading
import time

def download_file(file_name):
    print(f"Starting download: {file_name}")
    time.sleep(2)  # Simulate download time
    print(f"Download completed: {file_name}")

# Creating multiple threads
threads = []
file_names = ["file1.zip", "file2.zip", "file3.zip"]

for file in file_names:
    t = threading.Thread(target=download_file, args=(file,))
    threads.append(t)
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()

print("All downloads completed!")

Starting download: file1.zip
Starting download: file2.zip
Starting download: file3.zip
Download completed: file1.zipDownload completed: file2.zip

Download completed: file3.zip
All downloads completed!


In [None]:
import multiprocessing

def square(n):
    print(f"Processing {n} squared: {n * n}")

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]
    processes = []

    for num in numbers:
        p = multiprocessing.Process(target=square, args=(num,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All processes completed!")

Processing 1 squared: 1
Processing 2 squared: 4Processing 3 squared: 9

Processing 4 squared: 16
Processing 5 squared: 25
All processes completed!


In [None]:
import threading
import time
import random
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

# Simple function to print with thread name
def log(message):
    print(f"{threading.current_thread().name}: {message}")

# 1. Basic Thread with Lock
class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1
            log(f"Counter value: {self.value}")
            time.sleep(0.1)

# 2. Producer-Consumer with Queue
def producer(queue):
    for i in range(5):
        item = f"Item-{i}"
        queue.put(item)
        log(f"Produced {item}")
        time.sleep(0.2)

def consumer(queue):
    while True:
        try:
            item = queue.get(timeout=1)
            log(f"Consumed {item}")
            queue.task_done()
            time.sleep(0.3)
        except:
            break

# 3. Simple Thread Pool Task
def task(n):
    time.sleep(random.uniform(0.1, 0.5))
    return f"Task {n} completed"

def main():
    # Basic Thread Demo with Lock
    print("Starting Counter Demo")
    counter = Counter()
    threads = [
        threading.Thread(target=counter.increment, name=f"Thread-{i}")
        for i in range(3)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f"Final counter value: {counter.value}\n")

    # Producer-Consumer Demo
    print("Starting Producer-Consumer Demo")
    queue = Queue(maxsize=3)
    prod = threading.Thread(target=producer, args=(queue,), name="Producer")
    cons = threading.Thread(target=consumer, args=(queue,), name="Consumer")

    prod.start()
    cons.start()

    prod.join()
    cons.join()
    print("Producer-Consumer Demo Completed\n")

    # Thread Pool Demo
    print("Starting Thread Pool Demo")
    with ThreadPoolExecutor(max_workers=2) as executor:
        results = executor.map(task, range(4))
        for result in results:
            print(f"Main: {result}")
    print("Thread Pool Demo Completed")

if __name__ == "__main__":
    main()

Starting Counter Demo
Thread-0: Counter value: 1
Thread-1: Counter value: 2
Thread-2: Counter value: 3
Final counter value: 3

Starting Producer-Consumer Demo
Producer: Produced Item-0
Consumer: Consumed Item-0
Producer: Produced Item-1
Consumer: Consumed Item-1
Producer: Produced Item-2
Producer: Produced Item-3
Consumer: Consumed Item-2
Producer: Produced Item-4
Consumer: Consumed Item-3
Consumer: Consumed Item-4
Producer-Consumer Demo Completed

Starting Thread Pool Demo
Main: Task 0 completed
Main: Task 1 completed
Main: Task 2 completed
Main: Task 3 completed
Thread Pool Demo Completed


In [None]:
import multiprocessing as mp
import time

# Section 1: Basic Process Creation
# This shows how to create and run separate processes
def say_hello(name):
    """A simple function that runs in a process."""
    print(f"Hello from {name}! Starting...")
    time.sleep(1)  # Wait a bit to show processes run separately
    print(f"Goodbye from {name}! Finished.")

# Section 2: Sharing Numbers Between Processes
# This shows how processes can share and change a single number
def add_one(shared_number, name):
    """Add 1 to a shared number safely."""
    # We use a lock to make sure only one process changes the number at a time
    with shared_number.get_lock():
        old_value = shared_number.value
        time.sleep(0.5)  # Wait to show the effect
        shared_number.value = old_value + 1
        print(f"{name} changed number to {shared_number.value}")

# Section 3: Sending Messages Between Processes
# This shows how processes can talk to each other using a queue
def sender(queue):
    """Send a message to the queue."""
    message = "Hi there!"
    queue.put(message)  # Put message in the queue
    print(f"Sender sent: {message}")

def receiver(queue):
    """Get a message from the queue."""
    message = queue.get()  # Take message from the queue
    print(f"Receiver got: {message}")

# Section 4: Doing Work with a Pool
# This shows how to split work across multiple processes easily
def double_number(n):
    """Double a number (simple work for a process)."""
    return n * 2

# Main function to run everything
def main():
    # Section 1: Basic Process Demo
    print("Starting Basic Process Demo")
    # Create 2 processes that say hello
    process1 = mp.Process(target=say_hello, args=("Friend-1",))
    process2 = mp.Process(target=say_hello, args=("Friend-2",))

    # Start both processes
    process1.start()
    process2.start()

    # Wait for them to finish
    process1.join()
    process2.join()
    print("Basic Process Demo Done!\n")

    # Section 2: Sharing Numbers Demo
    print("Starting Sharing Numbers Demo")
    # Create a shared number starting at 0
    number = mp.Value('i', 0)  # 'i' means integer

    # Create 3 processes to add 1 to the number
    adders = [
        mp.Process(target=add_one, args=(number, f"Adder-{i}"))
        for i in range(3)
    ]

    # Start all adders
    for adder in adders:
        adder.start()

    # Wait for all to finish
    for adder in adders:
        adder.join()
    print(f"Final number: {number.value}\n")

    # Section 3: Sending Messages Demo
    print("Starting Messages Demo")
    # Create a queue for messages
    queue = mp.Queue()

    # Create sender and receiver processes
    send_proc = mp.Process(target=sender, args=(queue,))
    recv_proc = mp.Process(target=receiver, args=(queue,))

    # Start both
    send_proc.start()
    recv_proc.start()

    # Wait for both to finish
    send_proc.join()
    recv_proc.join()
    print("Messages Demo Done!\n")

    # Section 4: Pool Demo
    print("Starting Pool Demo")
    # Create a pool of 2 processes
    with mp.Pool(processes=2) as pool:
        # Double some numbers using the pool
        results = pool.map(double_number, [1, 2, 3, 4])
        print(f"Doubled numbers: {results}")
    print("Pool Demo Done!")

# This makes sure the code runs correctly
if __name__ == "__main__":
    main()

Starting Basic Process Demo
Hello from Friend-1! Starting...
Hello from Friend-2! Starting...
Goodbye from Friend-1! Finished.
Goodbye from Friend-2! Finished.
Basic Process Demo Done!

Starting Sharing Numbers Demo
Adder-0 changed number to 1
Adder-1 changed number to 2
Adder-2 changed number to 3
Final number: 3

Starting Messages Demo
Sender sent: Hi there!
Receiver got: Hi there!
Messages Demo Done!

Starting Pool Demo
Doubled numbers: [2, 4, 6, 8]
Pool Demo Done!


In [None]:
import threading
import multiprocessing as mp
import time
from math import factorial

# Section 1: I/O-Bound Task (Simulated with Sleep)
def io_task(name, duration):
    """Simulate an I/O operation like waiting for a file or network."""
    print(f"{name} starting I/O task")
    time.sleep(duration)  # Simulate waiting
    print(f"{name} finished I/O task")

# Section 2: CPU-Bound Task (Factorial Calculation)
def cpu_task(n, name):
    """Calculate factorial of a large number (CPU-intensive)."""
    print(f"{name} starting CPU task")
    result = factorial(n)  # Heavy computation
    print(f"{name} finished CPU task with result length: {len(str(result))}")

# Section 3: Run with Multithreading
def run_threading(io_duration, cpu_n):
    print("\n=== Multithreading Demo ===")

    # I/O Task with Threads
    start_time = time.time()
    threads_io = [
        threading.Thread(target=io_task, args=(f"Thread-IO-{i}", io_duration))
        for i in range(4)
    ]
    for t in threads_io:
        t.start()
    for t in threads_io:
        t.join()
    io_time = time.time() - start_time
    print(f"Threading I/O time: {io_time:.2f} seconds")

    # CPU Task with Threads
    start_time = time.time()
    threads_cpu = [
        threading.Thread(target=cpu_task, args=(cpu_n, f"Thread-CPU-{i}"))
        for i in range(4)
    ]
    for t in threads_cpu:
        t.start()
    for t in threads_cpu:
        t.join()
    cpu_time = time.time() - start_time
    print(f"Threading CPU time: {cpu_time:.2f} seconds")

    return io_time, cpu_time

# Section 4: Run with Multiprocessing
def run_multiprocessing(io_duration, cpu_n):
    print("\n=== Multiprocessing Demo ===")

    # I/O Task with Processes
    start_time = time.time()
    processes_io = [
        mp.Process(target=io_task, args=(f"Process-IO-{i}", io_duration))
        for i in range(4)
    ]
    for p in processes_io:
        p.start()
    for p in processes_io:
        p.join()
    io_time = time.time() - start_time
    print(f"Multiprocessing I/O time: {io_time:.2f} seconds")

    # CPU Task with Processes
    start_time = time.time()
    processes_cpu = [
        mp.Process(target=cpu_task, args=(cpu_n, f"Process-CPU-{i}"))
        for i in range(4)
    ]
    for p in processes_cpu:
        p.start()
    for p in processes_cpu:
        p.join()
    cpu_time = time.time() - start_time
    print(f"Multiprocessing CPU time: {cpu_time:.2f} seconds")

    return io_time, cpu_time

# Main function to compare
def main():
    # Parameters
    IO_DURATION = 1  # Seconds to sleep (small I/O task)
    CPU_N = 50000    # Number for factorial (big CPU task)

    # Run both demos
    thread_io_time, thread_cpu_time = run_threading(IO_DURATION, CPU_N)
    process_io_time, process_cpu_time = run_multiprocessing(IO_DURATION, CPU_N)

    # Compare Results
    print("\n=== Comparison ===")
    print(f"I/O Task (sleep {IO_DURATION}s):")
    print(f"  Threading: {thread_io_time:.2f}s | Multiprocessing: {process_io_time:.2f}s")
    print(f"CPU Task (factorial of {CPU_N}):")
    print(f"  Threading: {thread_cpu_time:.2f}s | Multiprocessing: {process_cpu_time:.2f}s")

    # Analysis
    print("\n=== Why These Results? ===")
    print("1. I/O Task: Threading is usually faster because:")
    print("   - Threads share memory, so less overhead.")
    print("   - Python's GIL doesn't block I/O operations.")
    print("2. CPU Task: Multiprocessing is faster because:")
    print("   - Processes bypass the GIL, using multiple CPU cores.")
    print("   - Each process runs independently.")

if __name__ == "__main__":
    main()


=== Multithreading Demo ===
Thread-IO-0 starting I/O task
Thread-IO-1 starting I/O task
Thread-IO-2 starting I/O task
Thread-IO-3 starting I/O task


Exception in thread Exception in thread Thread-35 (cpu_task):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-460ee7e03c4e>", line 18, in cpu_task
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit
Thread-34 (cpu_task):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-460ee7e03c4e>", line 18, in cpu_task
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit


Thread-IO-0 finished I/O task
Thread-IO-1 finished I/O task
Thread-IO-2 finished I/O task
Thread-IO-3 finished I/O task
Threading I/O time: 1.00 seconds
Thread-CPU-0 starting CPU task
Thread-CPU-1 starting CPU task
Thread-CPU-2 starting CPU task
Thread-CPU-3 starting CPU task


Exception in thread Thread-37 (cpu_task):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
Exception in thread Thread-36 (cpu_task):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-460ee7e03c4e>", line 18, in cpu_task
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-460ee7e03c4e>", line 18, in cpu_task
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit


Threading CPU time: 0.22 seconds

=== Multiprocessing Demo ===
Process-IO-0 starting I/O task
Process-IO-1 starting I/O task
Process-IO-2 starting I/O task
Process-IO-3 starting I/O task
Process-IO-0 finished I/O task
Process-IO-1 finished I/O taskProcess-IO-2 finished I/O task
Process-IO-3 finished I/O task

Multiprocessing I/O time: 1.10 seconds
Process-CPU-0 starting CPU task
Process-CPU-2 starting CPU taskProcess-CPU-1 starting CPU task

Process-CPU-3 starting CPU task


Process Process-38:
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-460ee7e03c4e>", line 18, in cpu_task
    print(f"{name} finished CPU task with result length: {len(str(result))}")
                                                              ^^^^^^^^^^^
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit
Process Process-36:
Process Process-37:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/process.py", li

Multiprocessing CPU time: 0.33 seconds

=== Comparison ===
I/O Task (sleep 1s):
  Threading: 1.00s | Multiprocessing: 1.10s
CPU Task (factorial of 50000):
  Threading: 0.22s | Multiprocessing: 0.33s

=== Why These Results? ===
1. I/O Task: Threading is usually faster because:
   - Threads share memory, so less overhead.
   - Python's GIL doesn't block I/O operations.
2. CPU Task: Multiprocessing is faster because:
   - Processes bypass the GIL, using multiple CPU cores.
   - Each process runs independently.


Explanation of the Code and Concepts
1. I/O-Bound Task (Simulated with time.sleep)
What It Does: Pretends to wait for something (like a file or network) using sleep.
Why: Shows how threading and multiprocessing handle tasks that wait a lot.
In the Code: io_task sleeps for 1 second per task.
2. CPU-Bound Task (Factorial Calculation)
What It Does: Calculates the factorial of a big number (e.g., 50,000), which takes lots of CPU power.
Why: Shows how threading and multiprocessing handle heavy computation.
In the Code: cpu_task uses factorial to do intense math.
3. Multithreading Demo
What It Does: Runs 4 threads for both I/O and CPU tasks.
How: Uses threading.Thread to create lightweight threads that share memory.
Key Point: Python’s Global Interpreter Lock (GIL) limits true parallelism for CPU tasks, but I/O tasks can run concurrently.
4. Multiprocessing Demo
What It Does: Runs 4 processes for both I/O and CPU tasks.
How: Uses multiprocessing.Process to create separate processes with their own memory.
Key Point: Bypasses the GIL, allowing true parallel execution on multiple CPU cores.
5. Comparison and Analysis
I/O Results: Threading usually wins because:
Threads are lighter (less startup overhead).
GIL doesn’t block I/O operations (e.g., sleep).
CPU Results: Multiprocessing wins because:
Each process uses a separate CPU core.
No GIL restriction, so true parallelism happens.
Running in Google Colab
Copy: Paste this code into a Colab cell.
Run: Click the play button.
Output: You’ll see:
Messages from each thread/process starting and finishing.
Time taken for I/O and CPU tasks in both threading and multiprocessing.
A comparison showing which is faster and why.
Expected Results (Example)
I/O Task (1s sleep, 4 tasks):
Threading: ~1.0s (all threads wait at once).
Multiprocessing: ~1.2s (slight overhead from process creation).
CPU Task (factorial of 50,000, 4 tasks):
Threading: ~4-5s (GIL forces one-at-a-time execution).
Multiprocessing: ~1-2s (4 cores split the work).
Note: Exact times depend on your CPU (e.g., Colab’s virtual machine) and its core count.

Why Multiprocessing Performs Better for Bigger Tasks
The GIL Problem: In Python, the Global Interpreter Lock (GIL) means only one thread can execute Python code at a time, even on multi-core CPUs. For CPU-bound tasks (like math), threading doesn’t use extra cores.
Multiprocessing Advantage: Each process runs in its own Python interpreter, avoiding the GIL. For big CPU tasks (e.g., factorial of 50,000), multiprocessing splits work across cores, finishing faster.
Trade-off: Multiprocessing has more overhead (creating processes takes time), so it’s slower for small or I/O-bound tasks where threading shines.

In [None]:
!pip install pylint


Collecting pylint
  Downloading pylint-3.3.5-py3-none-any.whl.metadata (12 kB)
Collecting dill>=0.3.6 (from pylint)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Collecting astroid<=3.4.0-dev0,>=3.3.8 (from pylint)
  Downloading astroid-3.3.9-py3-none-any.whl.metadata (4.5 kB)
Collecting isort!=5.13.0,<7,>=4.2.5 (from pylint)
  Downloading isort-6.0.1-py3-none-any.whl.metadata (11 kB)
Collecting mccabe<0.8,>=0.6 (from pylint)
  Downloading mccabe-0.7.0-py2.py3-none-any.whl.metadata (5.0 kB)
Collecting tomlkit>=0.10.1 (from pylint)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Downloading pylint-3.3.5-py3-none-any.whl (522 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m522.2/522.2 kB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading astroid-3.3.9-py3-none-any.whl (275 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m275.3/275.3 kB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.9-py3-no

In [None]:

# Save this code as a file to lint
code_to_lint = """
# Section 1: Naming Conventions
# Good naming
class DataProcessor:
    def __init__(self):
        self.item_count = 0  # Good: descriptive name

    def process_data(self, input_data):  # Good: clear parameter name
        return input_data.upper()

# Bad naming
class c:  # Pylint: invalid-name (too short)
    def __init__(self):
        self.x = 0  # Pylint: invalid-name (non-descriptive)

    def f(self, d):  # Pylint: invalid-name for both method and parameter
        return d

# Section 2: Unused Variables and Imports
import os  # Pylint: unused-import
from math import sin, cos  # cos is unused

def calculate_sine(x):
    result = sin(x)  # cos is imported but not used
    unused_var = 42  # Pylint: unused-variable
    return result

# Section 3: Code Structure and Formatting
# Good structure
def process_list(items: list) -> list:
    \"\"\"Process a list of items.

    Args:
        items: List of strings to process
    Returns:
        Processed list
    \"\"\"
    return [item.strip() for item in items]

# Bad structure
def BadFunc(x):#Pylint: invalid-name, missing-docstring
    y=1#Pylint: no-space-after-operator
    if x>0:#Pylint: no-space-after-operator
        return x+y #Pylint: multiple-statements

# Section 4: Type Hints and Documentation
# Good example
def add_numbers(num1: int, num2: int) -> int:
    \"\"\"Add two numbers together.

    Args:
        num1: First number
        num2: Second number
    Returns:
        Sum of numbers
    \"\"\"
    return num1 + num2

# Bad example
def sub(a,b): # Pylint: missing-docstring, invalid-name
    return a-b # No type hints

# Section 5: Variable Scope and Constants
MAX_COUNT = 100  # Good: constant in uppercase

def count_items():
    global MAX_COUNT  # Pylint: global-statement (avoid if possible)
    temp = 0
    for i in range(10):  # Pylint: unused-variable 'i' if not used
        temp += 1
    MAX_COUNT = temp  # Pylint: global-variable-not-assigned (if not needed)

# Section 6: Error Handling
# Good example
def safe_division(num: float, denom: float) -> float:
    try:
        return num / denom
    except ZeroDivisionError:
        return 0.0

# Bad example
def divide(a,b): # Pylint: missing-docstring
    return a/b # No error handling

# Section 7: Code Duplication and Complexity
# Good: DRY (Don't Repeat Yourself)
def format_name(first: str, last: str) -> str:
    return f"{first.strip()} {last.strip()}"

# Bad: Duplicated code
def print_name1(name):
    print(name.upper())
    print(name.lower())

def print_name2(name):
    print(name.upper())  # Pylint: duplicate-code
    print(name.lower())  # Repeated logic

# Section 8: Proper Main Function
def main():
    processor = DataProcessor()
    result = processor.process_data("hello")
    print(result)

if __name__ == "__main__":
    main()
"""

# Write the code to a file
with open("sample_code.py", "w") as f:
    f.write(code_to_lint)

# Run pylint on the file
!pylint sample_code.py

# Alternative: Run pylint with specific options
print("\nRunning Pylint with detailed output:")
!pylint sample_code.py --reports=y --score=y

************* Module sample_code
sample_code.py:7:0: C0303: Trailing whitespace (trailing-whitespace)
sample_code.py:15:0: C0303: Trailing whitespace (trailing-whitespace)
sample_code.py:1:0: C0114: Missing module docstring (missing-module-docstring)
sample_code.py:4:0: C0115: Missing class docstring (missing-class-docstring)
sample_code.py:8:4: C0116: Missing function or method docstring (missing-function-docstring)
sample_code.py:4:0: R0903: Too few public methods (1/2) (too-few-public-methods)
sample_code.py:12:0: C0115: Missing class docstring (missing-class-docstring)
sample_code.py:12:0: C0103: Class name "c" doesn't conform to PascalCase naming style (invalid-name)
sample_code.py:16:4: C0116: Missing function or method docstring (missing-function-docstring)
sample_code.py:12:0: R0903: Too few public methods (1/2) (too-few-public-methods)
sample_code.py:20:0: C0413: Import "import os" should be placed at the top of the module (wrong-import-position)
sample_code.py:21:0: C0413: Im