<a href="https://colab.research.google.com/github/PawelJakubczyk/random_pyspark_df_generator/blob/main/random_pyspark_df_generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instal library

In [3]:
%pip install faker
%pip install pyspark
%pip install delta-spark



# Import library & star session

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DataType, StructType, StructField, StringType, DateType, DecimalType, IntegerType, NumericType, DoubleType, FloatType
from py4j.protocol import Py4JJavaError
from delta import configure_spark_with_delta_pip
from datetime import datetime, timedelta, date
from faker import Faker
from faker.providers import BaseProvider
from random import choice, uniform, randint, seed
import string
import re

# Create a Spark session
# spark = SparkSession.builder.master("local[*]").getOrCreate()
builder = SparkSession.builder \
    .appName('DeltaLakeLocal') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Initialize Faker for generating fake data
fake = Faker()

# Define random data generate functions

<style>
  .tables {
    display: flex;
    justify-content: space-between;
  }
</style>

<div class="tables">

| Function           | Description                                                                |
|--------------------|----------------------------------------------------------------------------|
| `seed()`           | Initialize the random number generator                                    |
| `getstate()`       | Returns the current internal state of the random number generator          |
| `setstate()`       | Restores the internal state of the random number generator                 |
| `getrandbits()`    | Returns a number representing the random bits                               |
| `randrange()`      | Returns a random number between the given range with an optional step argument |
| `randint()`        | Returns a random number between the given range                              |
| `choice()`         | Returns a random element from the given sequence                            |
| `choices()`        | Returns a list with a random selection from the given sequence              |
| `shuffle()`        | Takes a sequence and returns the sequence in a random order                  |
| `sample()`         | Returns a given sample of a sequence                                         |
| `random()`         | Returns a random float number between 0 and 1                                |
| `uniform()`        | Returns a random float number between two given parameters                   |
| `triangular()`     | Returns a random float number between two given parameters with an optional mode parameter |
| `betavariate()`    | Returns a random float number between 0 and 1 based on the Beta distribution (used in statistics) |
| `expovariate()`    | Returns a random float number based on the Exponential distribution (used in statistics) |
| `gammavariate()`   | Returns a random float number based on the Gamma distribution (used in statistics) |
| `gauss()`          | Returns a random float number based on the Gaussian distribution (used in probability theories) |
| `lognormvariate()` | Returns a random float number based on a log-normal distribution (used in probability theories) |
| `normalvariate()`  | Returns a random float number based on the normal distribution (used in probability theories) |
| `vonmisesvariate()`| Returns a random float number based on the von Mises distribution (used in directional statistics) |
| `paretovariate()`  | Returns a random float number based on the Pareto distribution (used in probability theories) |
| `weibullvariate()` | Returns a random float number based on the Weibull distribution (used in statistics) |

</div>
<div class="tables">

| Faker Method                | Description                                       |
|-----------------------------|---------------------------------------------------|
| `fake.name()`               | Generate a random full name.                      |
| `fake.address()`            | Generate a random address.                         |
| `fake.email()`              | Generate a random email address.                  |
| `fake.phone_number()`       | Generate a random phone number.                   |
| `fake.date_of_birth()`      | Generate a random date of birth.                  |
| `fake.company()`            | Generate a random company name.                   |
| `fake.job()`                | Generate a random job title.                      |
| `fake.text()`               | Generate random text.                             |
| `fake.sentence()`           | Generate a random sentence.                       |
| `fake.word()`               | Generate a random word.                           |
| `fake.color_name()`         | Generate a random color name.                     |
| `fake.credit_card_number()` | Generate a random credit card number.             |
| `fake.url()`                | Generate a random URL.                            |
| `fake.domain_name()`        | Generate a random domain name.                    |
| `fake.ipv4()`               | Generate a random IPv4 address.                  |
| `fake.country()`            | Generate a random country name.                   |
| `fake.city()`               | Generate a random city name.                      |
| `fake.state()`              | Generate a random state name.                     |
| `fake.zipcode()`            | Generate a random ZIP code.                       |
| `fake.currency_code()`      | Generate a random currency code.                  |
| `fake.file_name()`          | Generate a random file name.                      |
| `fake.image_url()`          | Generate a random image URL.                      |
| `fake.random_element()`     | Generate a random element from a list.           |
| `fake.random_number()`      | Generate a random number.                         |
| `fake.boolean()`            | Generate a random boolean value.                  |
| `fake.paragraph()`          | Generate a random paragraph.                      |
| `fake.word_list()`          | Generate a list of random words.                 |
| `fake.unique.random_number()`| Generate a unique random number.                 |
| `fake.unique.random_element()`| Generate a unique random element from a list.   |

</div>

In [5]:
class CustomProvider(BaseProvider):
    def industry(self):
        industries = ["Technology", "Finance", "Healthcare", "Manufacturing", "Retail", "Telecommunications"]
        return self.random_element(industries)

    def phone_number_digits(self):
        return int(re.sub(r'\D', '', fake.phone_number()))

    def gender(self):
        gender = ["Male", "Female"]
        return self.random_element(gender)

    def marital_status(self):
        marital_status = ["Single", "Married", "Divorced"]
        return self.random_element(marital_status)

    def employment_status(self):
        employment_status = ["Employed", "Unemployed", "Self-employed"]
        return self.random_element(employment_status)

    def legal_status(self):
        legal_status = ["LLC", "Corporation", "Partnership"]
        return self.random_element(legal_status)

    def product_status(self):
        product_status = ["On Hold", "Abandoned", "Completed", ""]
        return self.random_element(product_status)

    def product_name(self):
        adjectives = ["Sleek", "Luxurious", "Stylish", "Innovative", "Premium"]
        nouns = ["Chair", "Table", "Lamp", "Sofa", "Bed"]
        product_code = str(randint(1,99999)).zfill(5)

        return f"{choice(adjectives)} {choice(nouns)} - {product_code}"

    def department(self):
        departments = ["Sales", "Marketing", "Finance", "HR", "IT", "Operations"]
        return self.random_element(departments)

fake.add_provider(CustomProvider)

def calculate_age(birthdate: date) -> int:
    today = date.today()
    age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
    return age

def format_date_for_pyspark(date_to_format):
    return date_to_format.strftime("%Y-%m-%d %H:%M:%S")

In [6]:
def create_random_person_info() -> dict:
    birthdate = fake.date_of_birth(tzinfo=None, minimum_age=18, maximum_age=65)
    age = calculate_age(birthdate)

    return {
        "Person_Name": fake.first_name(),
        "Person_Last_Name": fake.last_name(),
        "Person_Age": age,
        "Person_Date_Of_Birth": birthdate,
        "Person_Gender": fake.gender(),
        "Person_Phone_Number": fake.phone_number_digits(),
        "Person_Email": fake.email(),
        "Person_Country_Long": fake.country(),
        "Person_Zip_Code": fake.zipcode(),
        "Person_Street": fake.street_address(),
        "Person_Country_Short": fake.country_code(representation="alpha-2"),
        "Person_Income": round(uniform(20000, 100000), 2),
        "Person_Marital_Status": fake.marital_status(),
        "Person_Employment_Status": fake.employment_status()
    }


def create_random_company_info() -> dict:
    founded_date = (datetime.now() - timedelta(days=randint(1825, 36525))).date()
    return {
        "Company_Name": fake.company(),
        "Company_Type": fake.bs(),
        "Industry": fake.industry(),
        "Location": f"{fake.city()}, {fake.country()}",
        "Employee_Count": randint(10, 1000),
        "Revenue": round(uniform(1000000, 100000000), 2),
        "Website": fake.url(),
        "Email": fake.company_email(),
        "Description": fake.catch_phrase(),
        "Founded_Date": founded_date,
        "CEO": fake.name(),
        "Contact_Number": fake.phone_number_digits(),
        "Rating": round(uniform(1.0, 5.0), 2),
        "Legal_Status": fake.legal_status()
    }


def create_random_product_info_v1() -> dict:
    start_date = datetime.strptime("2012-01-01", '%Y-%m-%d')
    end_date = datetime.strptime("2030-12-31", '%Y-%m-%d')

    Validity_Date_From = fake.date_time_between(start_date=start_date, end_date=end_date)
    Validity_Date_To = fake.date_time_between(start_date=Validity_Date_From, end_date=end_date)
    Modification_Date = fake.date_time_between(start_date=Validity_Date_From, end_date= date.today())

    return {
        "ID": f'{randint(0, 999999)}-{choice(string.ascii_letters)}',
        "Subsector": fake.word(),
        "Category": fake.word(),
        "Brand": fake.word(),
        "Material_ID": randint(10000000, 99999999),
        "Description": fake.text(max_nb_chars=100),
        "Plant_Code": f'{choice(string.ascii_letters)}{str(randint(0, 999)).zfill(3)}',
        "Plant_Name": fake.word(),
        "Validity_Date_From": Validity_Date_From,
        "Validity_Date_To": Validity_Date_To,
        "Modification_Date": fake.date_time_between(start_date=start_date, end_date=end_date),
        "Status": fake.product_status()
    }


def create_random_product_info_v2() -> dict:
    return {
        "Product_Name": fake.product_name(),
        "Product_Category": fake.word(),
        "Product_Description": fake.text(max_nb_chars=100),
        "Product_Price": round(uniform(10.0, 1000.0), 2),
        "Product_Stock": randint(0, 1000),
        "Product_Manufacturer": fake.company(),
        "Product_Weight": round(uniform(0.1, 10.0), 2),
        "Product_Rating": round(uniform(1.0, 5.0), 2),
        "Product_SKU": fake.uuid4()
    }


def create_random_employee_info() -> dict:

    return {
        "Employee_Name": fake.first_name(),
        "Employee_Last_Name": fake.last_name(),
        "Employee_Age": randint(18, 65),
        "Employee_Birthdate": fake.date_of_birth(tzinfo=None, minimum_age=18, maximum_age=65),
        "Employee_Gender": fake.gender(),
        "Employee_Phone_Number": fake.phone_number_digits(),
        "Employee_Email": fake.email(),
        "Employee_Job_Title": fake.job(),
        "Employee_Department": fake.department(),
        "Employee_Salary": round(uniform(25000, 100000), 2),
        "Employee_Hire_Date": fake.date_between(start_date='-10y', end_date='today'),
        "Employee_Employee_ID": fake.unique.random_int(min=1000, max=9999, step=1)
    }


def create_random_customer_info() -> dict:

    return {
        "Customer_Name": fake.first_name(),
        "Customer_Last_Name": fake.last_name(),
        "Customer_Age": randint(18, 65),
        "Customer_Birthdate": fake.date_of_birth(tzinfo=None, minimum_age=18, maximum_age=65),
        "Customer_Gender": fake.gender(),
        "Customer_Email": fake.email(),
        "Customer_Phone_Number": fake.phone_number_digits(),
        "Customer_Address": fake.street_address(),
        "Customer_City": fake.city(),
        "Customer_Zip_Code": fake.zipcode()
    }


def create_random_order_info() -> dict:

    return {
        "Order_Number": fake.unique.random_int(min=1000, max=9999, step=1),
        "Order_Date": fake.date_time_between(start_date='-365d', end_date='now'),
        "Customer_ID": fake.unique.random_int(min=1000, max=9999, step=1),
        "Product_Name": fake.product_name(),
        "Order_Quantity": randint(1, 10),
        "Order_Total": round(uniform(10.0, 500.0), 2)
    }

# Define random dataframe generate functions

In [10]:
def generate_random_df(random_info_generator: callable, custom_schema:StructType, num_records:int = 200, seed_num:int = None):
    # Set the random seed for reproducibility
    if seed_num is None: seed_num = randint(1, 999)
    seed(seed_num)
    Faker.seed(seed_num)

    # Generate the list of dictionaries
    data = [random_info_generator() for _ in range(num_records)]
    # Create an RDD from the list of dictionaries
    rdd = spark.sparkContext.parallelize(data)
    # Convert the RDD to a DataFrame
    return rdd.toDF(custom_schema)

def generate_random_df_large_data(random_info_generator: callable, custom_schema: StructType, num_records: int = 400_000, checkpoint: int = 100_000, first_loop: bool = True, seed_num:int = None):
    # Set the random seed for reproducibility
    if seed_num is None: seed_num = randint(1, 999)
    seed_num += 1
    seed(seed_num)
    Faker.seed(seed_num)

    # Generate a list of dictionaries using the provided random_info_generator function
    data = [random_info_generator() for _ in range(min(num_records, checkpoint))]

    # Convert the list of dictionaries to an RDD
    rdd = spark.sparkContext.parallelize(data)

    # Convert the RDD to a DataFrame using the provided custom_schema
    df = rdd.toDF(custom_schema)

    # Checkpoint the DataFrame every `checkpoint` records
    # This helps to manage memory usage and avoid out-of-memory errors
    # when dealing with large datasets
    if num_records > checkpoint:
        num_records -= checkpoint
        generate_random_df(random_info_generator, custom_schema, num_records, checkpoint, False)

    # Write the DataFrame to a delta table
    # If this is the first loop, overwrite the existing table
    # Otherwise, append the new records to the existing table
    match first_loop:
        case True: df.write.format("delta").mode("overwrite").save(r"/content/delta_table")
        case False: df.write.format("delta").mode("append").save(r"/content/delta_table")

    # Delete the DataFrame from memory to free up resources
    del df

    # Read the delta table back into a DataFrame
    df_final = spark.read.format("delta").load(r"/content/delta_table")

    # Return the final DataFrame
    return df_final

# Define schema for random df

Data type for schema

| Data type               | Value type in Python         | API to access or create a data type       |
|-------------------------|------------------------------|------------------------------------------|
| ByteType                | int or long                   | `ByteType()`                             |
| ShortType               | int or long                   | `ShortType()`                            |
| IntegerType             | int or long                   | `IntegerType()`                          |
| LongType                | long                         | `LongType()`                             |
| FloatType               | float                        | `FloatType()`                            |
| DoubleType              | float                        | `DoubleType()`                           |
| DecimalType             | decimal.Decimal               | `DecimalType()`                          |
| StringType              | string                       | `StringType()`                           |
| BinaryType              | bytearray                    | `BinaryType()`                           |
| BooleanType             | bool                         | `BooleanType()`                          |
| TimestampType           | datetime.datetime             | `TimestampType()`                        |
| TimestampNTZType        | datetime.datetime             | `TimestampNTZType()`                     |
| DateType                | datetime.date                 | `DateType()`                             |
| DayTimeIntervalType     | datetime.timedelta            | `DayTimeIntervalType()`                  |
| ArrayType               | list, tuple, or array         | `ArrayType(elementType, [containsNull])` |
| MapType                 | dict                         | `MapType(keyType, valueType, [valueContainsNull])` |
| StructType              | list or tuple                 | `StructType(fields)`                     |
| StructField             | The value type in Python of the data type of this field (e.g., Int for a StructField with the data type IntegerType) | `StructField(name, dataType, [nullable])` |
| UserDefinedType (UDT)   | User-defined                 | -                                        |
| ObjectType              | Python object                 | -                                        |
| VectorUDT               | Vector                        | -                                        |
| ImageType               | Image                         | -                                        |
| NoneType                | None                         | -                                        |


In [11]:
person_info_schema = StructType([
    StructField("Person_Name", StringType(), True),
    StructField("Person_Last_Name", StringType(), True),
    StructField("Person_Age", IntegerType(), True),
    StructField("Person_Date_Of_Birth", DateType(), True),
    StructField("Person_Gender", StringType(), True),
    StructField("Person_Phone_Number", StringType(), True),
    StructField("Person_Email", StringType(), True),
    StructField("Person_Country_Long", StringType(), True),
    StructField("Person_Zip_Code", StringType(), True),
    StructField("Person_Street", StringType(), True),
    StructField("Person_Country_Short", StringType(), True),
    StructField("Person_Income", DoubleType(), True),
    StructField("Person_Marital_Status", StringType(), True),
    StructField("Person_Employment_Status", StringType(), True),
])

company_info_schema = StructType([
    StructField("Company_Name", StringType(), True),
    StructField("Company_Type", StringType(), True),
    StructField("Industry", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Employee_Count", IntegerType(), True),
    StructField("Revenue", FloatType(), True),
    StructField("Website", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Founded_Date", DateType(), True),
    StructField("CEO", StringType(), True),
    StructField("Contact_Number", StringType(), True),
    StructField("Rating", FloatType(), True),
    StructField("Legal_Status", StringType(), True)
])

product_info_v1_schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Subsector", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Brand", StringType(), True),
    StructField("Material_ID", IntegerType(), True),
    StructField("Description", StringType(), True),
    StructField("Plant_Code", StringType(), True),
    StructField("Plant_Name", StringType(), True),
    StructField("Validity_Date_From", DateType(), True),
    StructField("Validity_Date_To", DateType(), True),
    StructField("Modification_Date", DateType(), True),
    StructField("Status", StringType(), True)
])

product_info_v2_schema = StructType([
    StructField("Product_Name", StringType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Product_Description", StringType(), True),
    StructField("Product_Price", FloatType(), True),
    StructField("Product_Stock", IntegerType(), True),
    StructField("Product_Manufacturer", StringType(), True),
    StructField("Product_Weight", FloatType(), True),
    StructField("Product_Rating", FloatType(), True),
    StructField("Product_SKU", StringType(), True)
])

employee_info_schema = StructType([
    StructField("Employee_Name", StringType(), True),
    StructField("Employee_Last_Name", StringType(), True),
    StructField("Employee_Age", IntegerType(), True),
    StructField("Employee_Birthdate", DateType(), True),
    StructField("Employee_Gender", StringType(), True),
    StructField("Employee_Phone_Number", StringType(), True),
    StructField("Employee_Email", StringType(), True),
    StructField("Employee_Job_Title", StringType(), True),
    StructField("Employee_Department", StringType(), True),
    StructField("Employee_Salary", FloatType(), True),
    StructField("Employee_Hire_Date", DateType(), True),
    StructField("Employee_Employee_ID", IntegerType(), True)
])

customer_info_schema = StructType([
    StructField("Customer_Name", StringType(), True),
    StructField("Customer_Last_Name", StringType(), True),
    StructField("Customer_Age", IntegerType(), True),
    StructField("Customer_Birthdate", DateType(), True),
    StructField("Customer_Gender", StringType(), True),
    StructField("Customer_Email", StringType(), True),
    StructField("Customer_Phone_Number", StringType(), True),
    StructField("Customer_Address", StringType(), True),
    StructField("Customer_City", StringType(), True),
    StructField("Customer_Zip_Code", StringType(), True)
])

order_info_schema = StructType([
    StructField("Order_Number", IntegerType(), True),
    StructField("Order_Date", DateType(), True),
    StructField("Customer_ID", IntegerType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Order_Quantity", IntegerType(), True),
    StructField("Order_Total", FloatType(), True)
])

In [13]:
seed_num = 1
Faker.seed(seed_num)
seed(seed_num)

summary_dict = {
    "generate_person_info":[create_random_person_info, person_info_schema],
    "generate_company_info":[create_random_company_info, company_info_schema],
    "generate_product_info":[create_random_product_info_v1, product_info_v1_schema],
    "generate_product_info_v2":[create_random_product_info_v2, product_info_v2_schema],
    "generate_employee_info":[create_random_employee_info, employee_info_schema],
    "generate_customer_info":[create_random_customer_info, customer_info_schema],
    "generate_order_info":[create_random_order_info, order_info_schema]
    }

create_random_info, info_schema = summary_dict["generate_order_info"]

df = generate_random_df(create_random_info, info_schema, 20)
# df = generate_random_df_large_data(create_random_info, info_schema, 2_000_000)

df.show()

+------------+----------+-----------+--------------------+--------------+-----------+
|Order_Number|Order_Date|Customer_ID|        Product_Name|Order_Quantity|Order_Total|
+------------+----------+-----------+--------------------+--------------+-----------+
|        1024|2024-01-13|       9828| Premium Bed - 00198|            10|     446.67|
|        5971|2023-10-28|       9964| Stylish Bed - 86084|             1|     267.64|
|        1124|2023-08-08|       6602|Stylish Lamp - 44821|            10|      460.7|
|        5566|2023-06-05|       3442|Premium Lamp - 19540|             4|     286.42|
|        5447|2023-11-28|       5869|Innovative Table ...|             6|     259.02|
|        9050|2023-03-20|       9326|Luxurious Lamp - ...|             9|     187.63|
|        1213|2023-08-09|       6215|Premium Lamp - 14241|             9|     496.23|
|        9743|2023-06-09|       2780|Premium Sofa - 00085|             5|     390.16|
|        9946|2023-05-01|       9282|Luxurious Bed - 0