<a href="https://colab.research.google.com/github/gurpinder7473/Advanced-Python-/blob/main/Mini_Project_06.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **                                            Advance Python Assignments**



## **Part I: Process Automation **

Q1.   Create a file that contains 1000 lines of random strings.

In [None]:
import random
import string

def generate_random_string(length=50):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def create_file_with_random_strings(filename, lines=1000):
    with open(filename, 'w') as f:
        for _ in range(lines):
            f.write(generate_random_string() + '\n')

create_file_with_random_strings('random_1000_lines.txt')


Q2. Create a file that contains multiple lines of random strings and file size must be 5 MB.

In [None]:
import random
import string

def generate_random_line(length=100):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + '\n'

def create_file_5mb(filename='random_5MB.txt', target_size_mb=5):
    target_size_bytes = target_size_mb * 1024 * 1024  # 5 MB in bytes
    written_bytes = 0

    with open(filename, 'w') as f:
        while written_bytes < target_size_bytes:
            line = generate_random_line()
            f.write(line)
            written_bytes += len(line)

create_file_5mb()


Q3. Create 10 files that contains multiple lines of random strings and file size of each file must be 5 MB.

In [None]:
import os
import secrets
import string

NUM_FILES = 10
TARGET_BYTES = 5 * 1024 * 1024  # 5 MiB
LINE_LEN = 64  # characters per line

POOL = string.ascii_letters + string.digits + string.punctuation
# Use secrets.choice for cryptographically secure randomness :contentReference[oaicite:1]{index=1}

def random_line(n):
    return ''.join(secrets.choice(POOL) for _ in range(n))

def generate_one(filename):
    with open(filename, "w", encoding="utf-8") as f:
        while f.tell() < TARGET_BYTES:
            line = random_line(LINE_LEN)
            f.write(line + "\n")
            if f.tell() > TARGET_BYTES:
                f.truncate(TARGET_BYTES)
                break

def main():
    os.makedirs("random5MB_files", exist_ok=True)
    for i in range(1, NUM_FILES + 1):
        fname = f"random5MB_files/random_{i:02d}.txt"
        generate_one(fname)
        size = os.path.getsize(fname)
        print(f"Wrote {size:,} bytes → {fname}")

if __name__ == "__main__":
    main()


Q4.  Create 5 files of size 1GB, 2GB, 3GB, 4GB and 5GB; file contains multiple lines of random strings.

In [None]:
import os
import random
import string

def generate_random_line(line_length=100):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=line_length)) + '\n'

def create_file(filename, size_gb):
    size_bytes = size_gb * 1024 * 1024 * 1024
    written_bytes = 0

    with open(filename, 'w') as f:
        while written_bytes < size_bytes:
            line = generate_random_line()
            f.write(line)
            written_bytes += len(line)

# Generate 5 files of 1GB to 5GB
for i in range(1, 6):
    create_file(f'random_file_{i}GB.txt', i)


Q5.  Convert all the files of Q4 into upper case parallel one by one

In [None]:
def convert_to_uppercase_sequential():
    for i in range(1, 6):
        input_file = f'random_file_{i}GB.txt'
        output_file = f'uppercase_file_{i}GB.txt'

        with open(input_file, 'r') as rf, open(output_file, 'w') as wf:
            for line in rf:
                wf.write(line.upper())

convert_to_uppercase_sequential()


Q6. Convert all the files of Q4 into upper case parallel using multi-threading

In [None]:
import threading

def convert_file_uppercase(i):
    input_file = f'random_file_{i}GB.txt'
    output_file = f'uppercase_parallel_file_{i}GB.txt'

    with open(input_file, 'r') as rf, open(output_file, 'w') as wf:
        for line in rf:
            wf.write(line.upper())

threads = []
for i in range(1, 6):
    t = threading.Thread(target=convert_file_uppercase, args=(i,))
    threads.append(t)
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()


Q7. WAP to automatically download 10 images of cat from “Google Images”. [Hint: Find the package from
pypi.org and use it]

In [None]:
pip install google_images_download


In [None]:
from google_images_download import google_images_download

def download_cat_images():
    response = google_images_download.googleimagesdownload()

    arguments = {
        "keywords": "cat",
        "limit": 10,
        "print_urls": True,
        "format": "jpg",
        "output_directory": "downloads",
        "image_directory": "cats"
    }

    response.download(arguments)

download_cat_images()


Q8. WAP to automatically download 10 videos of “Machine Learning” from “Youtube.com”. [Hint: Find the
package from pypi.org and use it]

In [None]:
from pytube import Search, YouTube

def download_machine_learning_videos(max_videos=10):
    query = "Machine Learning"
    search = Search(query)

    count = 0
    for video in search.results:
        try:
            yt = YouTube(video.watch_url)
            stream = yt.streams.filter(progressive=True, file_extension='mp4').get_lowest_resolution()
            print(f"Downloading: {yt.title}")
            stream.download(output_path='videos')
            count += 1
            if count >= max_videos:
                break
        except Exception as e:
            print(f"Failed to download: {e}")

download_machine_learning_videos()


Q9. Convert all the videos of Q8 and convert it to audio. [Hint: Find the package from pypi.org and use it]

In [None]:
pip install moviepy


In [None]:
import os
from moviepy.editor import VideoFileClip

def convert_videos_to_audio(video_folder='videos', audio_folder='audios'):
    os.makedirs(audio_folder, exist_ok=True)

    for filename in os.listdir(video_folder):
        if filename.endswith('.mp4'):
            video_path = os.path.join(video_folder, filename)
            audio_path = os.path.join(audio_folder, filename.replace('.mp4', '.mp3'))

            try:
                print(f"Converting {filename} to MP3...")
                video = VideoFileClip(video_path)
                video.audio.write_audiofile(audio_path)
                video.close()
            except Exception as e:
                print(f"Error processing {filename}: {e}")

convert_videos_to_audio()


Q10. Create an automated pipeline using multi-threading for:
“Automatic Download of 100 Videos from YouTube” → “Convert it to Audio”

In [None]:
pip install pytube moviepy


In [None]:
import os
import threading
from pytube import Search, YouTube
from moviepy.editor import VideoFileClip

# Create necessary folders
os.makedirs('videos', exist_ok=True)
os.makedirs('audios', exist_ok=True)

# ---------- Video Download Function ----------
def download_video(video_url, video_index):
    try:
        yt = YouTube(video_url)
        stream = yt.streams.filter(progressive=True, file_extension='mp4').get_lowest_resolution()
        filename = f'videos/video_{video_index}.mp4'
        print(f"[{video_index}] Downloading: {yt.title}")
        stream.download(filename=filename)
        print(f"[{video_index}] Download complete.")
        return filename
    except Exception as e:
        print(f"[{video_index}] Error downloading video: {e}")
        return None

# ---------- Audio Conversion Function ----------
def convert_to_audio(video_path, index):
    try:
        audio_path = f'audios/audio_{index}.mp3'
        print(f"[{index}] Converting to audio...")
        video = VideoFileClip(video_path)
        video.audio.write_audiofile(audio_path, logger=None)
        video.close()
        print(f"[{index}] Audio saved: {audio_path}")
    except Exception as e:
        print(f"[{index}] Error converting video: {e}")

# ---------- Worker Thread Function ----------
def worker(video_url, index):
    video_path = download_video(video_url, index)
    if video_path:
        convert_to_audio(video_path, index)

# ---------- Main Pipeline ----------
def automated_pipeline():
    query = "machine learning"
    search = Search(query)
    video_urls = [video.watch_url for video in search.results[:100]]

    threads = []
    for idx, url in enumerate(video_urls):
        t = threading.Thread(target=worker, args=(url, idx + 1))
        threads.append(t)
        t.start()

    # Wait for all threads to finish
    for t in threads:
        t.join()

    print(" All videos downloaded and converted.")

automated_pipeline()


Q11. Create an automated pipeline using multi-threading for: “Automatic Download of 500 images of Dog from
GoogleImages” → “Rescale it to 50%

In [None]:
pip install icrawler Pillow


In [None]:
import os
import threading
from PIL import Image
from icrawler.builtin import GoogleImageCrawler

# Create folders
os.makedirs("dog_images_original", exist_ok=True)
os.makedirs("dog_images_resized", exist_ok=True)

# ---------- Step 1: Download Images ----------
def download_images(keyword="dog", num_images=500, folder="dog_images_original"):
    crawler = GoogleImageCrawler(storage={"root_dir": folder})
    crawler.crawl(keyword=keyword, max_num=num_images)
    print("Image download completed.")

# ---------- Step 2: Resize Function ----------
def resize_image(image_path, output_folder):
    try:
        img = Image.open(image_path)
        width, height = img.size
        resized_img = img.resize((width // 2, height // 2))
        filename = os.path.basename(image_path)
        resized_img.save(os.path.join(output_folder, filename))
        print(f"Resized: {filename}")
    except Exception as e:
        print(f"Failed to resize {image_path}: {e}")

# ---------- Step 3: Threaded Resizing ----------
def resize_images_multithreaded(input_folder="dog_images_original", output_folder="dog_images_resized"):
    threads = []
    for filename in os.listdir(input_folder):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
            image_path = os.path.join(input_folder, filename)
            t = threading.Thread(target=resize_image, args=(image_path, output_folder))
            threads.append(t)
            t.start()

    for t in threads:
        t.join()
    print("All images resized.")

# ---------- Pipeline Runner ----------
def image_pipeline():
    download_images()  # Step 1: Download 500 dog images
    resize_images_multithreaded()  # Step 2: Resize using threads

image_pipeline()


Q12. Create a random dataset of 100 rows and 30 columns. All the values are defined between [1,200]. Perform
the following operations:
(i) Replace all the values with NA in the dataset defined between [10, 60]. Print the count of number
rows having missing values.
(ii) Replace all the NA values with the average of the column value.
(iii) Find the Pearson correlation among all the columns and plot heat map. Also select those columns
having correlation <=0.7.
(iv) Normalize all the values in the dataset between 0 and 10.
(v) Replace all the values in the dataset with 1 if value <=0.5 else with 0.

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

np.random.seed(42)
df = pd.DataFrame(np.random.randint(1, 201, size=(100, 30)))

df_na = df.mask(df.between(10, 60))
rows_with_na = df_na.isnull().any(axis=1).sum()
print("Rows with missing values:", rows_with_na)

df_filled = df_na.fillna(df_na.mean(numeric_only=True))

corr = df_filled.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr, cmap='coolwarm', linewidths=0.5)
plt.title("Correlation Heatmap")
plt.show()

selected_cols = [col for col in corr.columns if all(abs(corr[col][corr.columns != col]) <= 0.7)]
df_selected = df_filled[selected_cols]
print("Columns with correlation ≤ 0.7:", selected_cols)

df_normalized = (df_selected - df_selected.min()) / (df_selected.max() - df_selected.min()) * 10

df_binary = (df_normalized <= 0.5).astype(int)
print(df_binary.head())


Q13. Create a random dataset of 600 rows and 15 columns. All the values are defined between [-100,100].
Perform the following operations:
(i) Plot scatter graph between Column 5 and Column 6.
(ii) Plot histogram of each column in single graph.
(iii) Plot the Box plot of each column in single graph.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

np.random.seed(42)
df = pd.DataFrame(np.random.randint(-100, 101, size=(600, 15)))

# (i) Scatter plot between Column 5 and Column 6
plt.figure(figsize=(6, 4))
plt.scatter(df[4], df[5], alpha=0.6, color='teal')
plt.title("Scatter Plot: Column 5 vs Column 6")
plt.xlabel("Column 5")
plt.ylabel("Column 6")
plt.grid(True)
plt.tight_layout()
plt.show()

# (ii) Histogram of each column in one graph
df.plot(kind='hist', bins=20, alpha=0.6, figsize=(10, 6))
plt.title("Histogram of All Columns")
plt.xlabel("Value Range")
plt.tight_layout()
plt.show()

# (iii) Box plot of each column in one graph
plt.figure(figsize=(12, 6))
sns.boxplot(data=df, orient='v')
plt.title("Box Plot of All Columns")
plt.xlabel("Columns")
plt.ylabel("Values")
plt.tight_layout()
plt.show()
