In [2]:
import pyspark
print(pyspark.__version__)

3.5.0


In [4]:
!pip install faker


Collecting faker
  Downloading Faker-26.0.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-26.0.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: faker
Successfully installed faker-26.0.0


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, current_date, date_sub, rand
import subprocess
import os

# Initializing Spark Session
spark = SparkSession.builder.appName("RandomFile").getOrCreate()

# Function to get file size
def get_file_size(file_path):
    if not os.path.exists(file_path):
        return 0
    result = subprocess.run(['du', '-sb', file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if result.returncode != 0:
        print(f"Error getting file size: {result.stderr.decode()}")
        return 0
    return int(result.stdout.split()[0])

# Setting the initial parameters
initial_count = 100000 
output_path = "randomly_generated_large_dataset"
target_size = 10 * 1024 * 1024  


while True:
    # Generating DataFrame
    df = spark.range(0, initial_count).select(
        expr("uuid()").alias("id"),
        expr("concat('FirstName', cast(id as string))").alias("first_name"),
        expr("concat('LastName', cast(id as string))").alias("last_name"),
        expr("concat(cast(rand()*1000 as int), ' Main St, City, Country')").alias("address"),
        date_sub(current_date(), (rand()*365*72).cast("int")).alias("date_of_birth")
    )

    # Writing to CSV file
    df.write.csv(output_path, header=True, mode="append")

    # Checking the current size
    current_size = get_file_size(output_path)
    print(f"Current size: {current_size / (1024*1024):.2f} MB")

    if current_size >= target_size:
        break

    # Calculating the new count for next iteration
    increase_factor = target_size / current_size
    initial_count = int(initial_count * increase_factor)

print(f"Successfully Generated CSV file of size: {current_size / (1024*1024*1024):.2f} GB")

# Once we are done, we stop the Spark Session
spark.stop()

Current size: 40.63 MB
Successfully Generated CSV file of size: 0.04 GB


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, expr, current_date, date_sub, rand, concat, lit
from pyspark.sql.types import StringType
import random
from faker import Faker
import os
import subprocess

# Initialize Spark Session
spark = SparkSession.builder.appName("RandomAddressGenerator").getOrCreate()

# Lists for generating random addresses
street_names = ["Main", "Oak", "Pine", "Maple", "Cedar", "Elm", "Washington", "Lake", "Hill"]
street_types = ["St", "Ave", "Rd", "Blvd", "Ln", "Dr", "Way", "Pl"]
cities = ["Springfield", "Franklin", "Clinton", "Greenville", "Bristol", "Fairview", "Salem", "Madison"]
states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN"]

fake = Faker()

@udf(returnType=StringType())
def random_first_name():
    return fake.first_name()

@udf(returnType=StringType())
def random_last_name():
    return fake.last_name()


# UDF for generating random addresses
@udf(returnType=StringType())
def generate_random_address():
    street_num = random.randint(100, 9999)
    street_name = random.choice(street_names)
    street_type = random.choice(street_types)
    city = random.choice(cities)
    state = random.choice(states)
    zip_code = random.randint(10000, 99999)
    return f"{street_num} {street_name} {street_type}, {city}, {state} {zip_code}"

# Function to get file size (unchanged from your original code)
def get_file_size(file_path):
    if not os.path.exists(file_path):
        return 0
    result = subprocess.run(['du', '-sb', file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if result.returncode != 0:
        print(f"Error getting file size: {result.stderr.decode()}")
        return 0
    return int(result.stdout.split()[0])
    

# Set initial parameters
initial_count = 100000  # Adjust as needed
output_path = "random_sample_data"
target_size = 10 * 1024   

# Main loop
while True:
    # Generate DataFrame with random addresses
    df = spark.range(0, initial_count).select(
        random_first_name().alias("first_name"),
        random_last_name().alias("last_name"),
        generate_random_address().alias("address"),
        date_sub(current_date(), (rand()*365*72).cast("int")).alias("date_of_birth")
    )

    # Write DataFrame to CSV
    df.write.csv(output_path, header=True, mode="append")

    # Check current size
    current_size = get_file_size(output_path)
    print(f"Current size: {current_size / (1024*1024):.2f} MB")

    if current_size >= target_size:
        break

    # Calculate new count for next iteration
    increase_factor = target_size / current_size
    initial_count = int(initial_count * increase_factor)

print(f"Successfully Generated CSV file of size: {current_size / (1024*1024*1024):.2f} GB")

# Stop Spark Session
spark.stop()

Current size: 5.87 MB
Successfully Generated CSV file of size: 0.01 GB
