<a href="https://colab.research.google.com/github/PawelJakubczyk/Manufacturing-Revenue-for-EU-Export/blob/main/generate_random_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instal library

In [106]:
%pip install faker
%pip install pyspark



# Import library & star session

In [107]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, FloatType
from datetime import datetime
from faker import Faker
from faker.providers import BaseProvider
from random import choice, uniform, randint, seed

# Create a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Initialize Faker for generating fake data
fake = Faker()

# Define class and functions

### Define provider and Utilities functions

<style>
  .tables {
    display: flex;
    justify-content: space-between;
  }
</style>

<div class="tables">

| Function           | Description                                                                |
|--------------------|----------------------------------------------------------------------------|
| `seed()`           | Initialize the random number generator                                    |
| `getstate()`       | Returns the current internal state of the random number generator          |
| `setstate()`       | Restores the internal state of the random number generator                 |
| `getrandbits()`    | Returns a number representing the random bits                               |
| `randrange()`      | Returns a random number between the given range with an optional step argument |
| `randint()`        | Returns a random number between the given range                              |
| `choice()`         | Returns a random element from the given sequence                            |
| `choices()`        | Returns a list with a random selection from the given sequence              |
| `shuffle()`        | Takes a sequence and returns the sequence in a random order                  |
| `sample()`         | Returns a given sample of a sequence                                         |
| `random()`         | Returns a random float number between 0 and 1                                |
| `uniform()`        | Returns a random float number between two given parameters                   |
| `triangular()`     | Returns a random float number between two given parameters with an optional mode parameter |
| `betavariate()`    | Returns a random float number between 0 and 1 based on the Beta distribution (used in statistics) |
| `expovariate()`    | Returns a random float number based on the Exponential distribution (used in statistics) |
| `gammavariate()`   | Returns a random float number based on the Gamma distribution (used in statistics) |
| `gauss()`          | Returns a random float number based on the Gaussian distribution (used in probability theories) |
| `lognormvariate()` | Returns a random float number based on a log-normal distribution (used in probability theories) |
| `normalvariate()`  | Returns a random float number based on the normal distribution (used in probability theories) |
| `vonmisesvariate()`| Returns a random float number based on the von Mises distribution (used in directional statistics) |
| `paretovariate()`  | Returns a random float number based on the Pareto distribution (used in probability theories) |
| `weibullvariate()` | Returns a random float number based on the Weibull distribution (used in statistics) |

</div>
<div class="tables">

| Faker Method                | Description                                       |
|-----------------------------|---------------------------------------------------|
| `fake.name()`               | Generate a random full name.                      |
| `fake.address()`            | Generate a random address.                         |
| `fake.email()`              | Generate a random email address.                  |
| `fake.phone_number()`       | Generate a random phone number.                   |
| `fake.date_of_birth()`      | Generate a random date of birth.                  |
| `fake.company()`            | Generate a random company name.                   |
| `fake.job()`                | Generate a random job title.                      |
| `fake.text()`               | Generate random text.                             |
| `fake.sentence()`           | Generate a random sentence.                       |
| `fake.word()`               | Generate a random word.                           |
| `fake.color_name()`         | Generate a random color name.                     |
| `fake.credit_card_number()` | Generate a random credit card number.             |
| `fake.url()`                | Generate a random URL.                            |
| `fake.domain_name()`        | Generate a random domain name.                    |
| `fake.ipv4()`               | Generate a random IPv4 address.                  |
| `fake.country()`            | Generate a random country name.                   |
| `fake.city()`               | Generate a random city name.                      |
| `fake.state()`              | Generate a random state name.                     |
| `fake.zipcode()`            | Generate a random ZIP code.                       |
| `fake.currency_code()`      | Generate a random currency code.                  |
| `fake.file_name()`          | Generate a random file name.                      |
| `fake.image_url()`          | Generate a random image URL.                      |
| `fake.random_element()`     | Generate a random element from a list.           |
| `fake.random_number()`      | Generate a random number.                         |
| `fake.boolean()`            | Generate a random boolean value.                  |
| `fake.paragraph()`          | Generate a random paragraph.                      |
| `fake.word_list()`          | Generate a list of random words.                 |
| `fake.unique.random_number()`| Generate a unique random number.                 |
| `fake.unique.random_element()`| Generate a unique random element from a list.   |

</div>

In [108]:
class CustomProvider(BaseProvider):

    def subsector(self):
        subsectors = ["Bakery Supplies", "Candy & Chocolate", "Dessert Mixes", "Ice Cream Ingredients",
                      "Packaged Sweets", "Pastry Ingredients", "Sweeteners", "Toppings & Decorations"]
        return self.random_element(subsectors)

    def category(self):
        categories = ["Cake", "Cookie", "Chocolate", "Pastry", "Candy", "Dessert", "Sweet", "Fudge", "Treat", "Confectionery",
                      "Bakery", "Pie", "Ice Cream", "Caramel", "Marshmallow", "Gumdrop", "Lollipop", "Truffle", "Pudding",
                      "Macaron", "Cupcake", "Brownie", "Tart", "Donut", "Frosting", "Creme", "Jelly", "Syrup", "Fruitcake"]
        return self.random_element(categories)

    def brand(self):
        brands = ["Zypher", "Exquinox", "Vintura", "Quentro", "Travala", "Nexxon", "Delisio", "Zemora", "Aurora", "Novata"]
        return self.random_element(brands)

fake.add_provider(CustomProvider)

### Define random dataframe generate functions

In [109]:
def generate_random_df(random_info_generator: callable, custom_schema:StructType, num_records:int = 200, seed_num:int = None):
    # Set the random seed for reproducibility
    if seed_num is None: seed_num = randint(1, 999)
    seed(seed_num)
    Faker.seed(seed_num)

    # Generate the list of dictionaries
    data = [random_info_generator() for _ in range(num_records)]
    # Create an RDD from the list of dictionaries
    rdd = spark.sparkContext.parallelize(data)
    # Convert the RDD to a DataFrame
    return rdd.toDF(custom_schema)

def generate_random_df_large_data(random_info_generator: callable, custom_schema: StructType, num_records: int = 400_000, checkpoint: int = 400_000, first_loop: bool = True, seed_num:int = None):
    # Set the random seed for reproducibility
    if seed_num is None: seed_num = randint(1, 999)
    seed_num += 1
    seed(seed_num)
    Faker.seed(seed_num)

    # Generate a list of dictionaries using the provided random_info_generator function
    data = [random_info_generator() for _ in range(min(num_records, checkpoint))]

    # Convert the list of dictionaries to an RDD
    rdd = spark.sparkContext.parallelize(data)

    # Convert the RDD to a DataFrame using the provided custom_schema
    df = rdd.toDF(custom_schema)

    # Checkpoint the DataFrame every `checkpoint` records
    # This helps to manage memory usage and avoid out-of-memory errors
    # when dealing with large datasets
    if num_records > checkpoint:
        num_records -= checkpoint
        generate_random_df(random_info_generator, custom_schema, num_records, checkpoint, False)

    # Write the DataFrame to a delta table
    # If this is the first loop, overwrite the existing table
    # Otherwise, append the new records to the existing table
    match first_loop:
        case True: df.write.format("delta").mode("overwrite").save(r"/content/delta_table")
        case False: df.write.format("delta").mode("append").save(r"/content/delta_table")

    # Delete the DataFrame from memory to free up resources
    del df

    # Read the delta table back into a DataFrame
    df_final = spark.read.format("delta").load(r"/content/delta_table")

    # Return the final DataFrame
    return df_final

### Define functions for data generation

In [110]:
def create_random_product_info() -> dict:
    """
    Function to create a dictionary containing random product information.
    """
    return {
        "Product_ID": str(randint(1000, 2000)),
        "Subsector": fake.subsector(),
        "Category": fake.category(),
        "Brand": fake.brand(),
        "Material_ID": randint(10000, 99999),
        "Description": fake.text(max_nb_chars=100),
        "Weight": float(round(fake.random_number(digits=2, fix_len=False), 2)),
        "Shelf_Life": randint(30, 365),
        "Supplier_ID": randint(1, 35),
    }

def generate_random_transaction_info() -> dict:
    """
    Function to generate a dictionary containing random transaction information.
    """
    start_date = datetime.strptime("2012-01-01", '%Y-%m-%d')
    end_date = datetime.strptime("2030-12-31", '%Y-%m-%d')
    validity_date_from = fake.date_time_between(start_date=start_date, end_date=end_date)
    validity_date_to = fake.date_time_between(start_date=validity_date_from, end_date=end_date)
    sales_end_date = validity_date_to if randint(1, 100) <= 80 else None

    return {
        "Supplier_ID": randint(1, 35),
        "Countries_of_Sale": fake.country(),
        "Transaction_ID": choice(uuid_list),
        "Sales_Initiation_Date": validity_date_from,
        "Sales_End_Date": sales_end_date,
        "Product_ID": str(randint(1000, 2000)),
    }

def generate_financial_data_info() -> dict:
    """
    Function to generate financial data information.
    """
    profit_ytd = round(uniform(10000, 1000000), 2)
    profit_last_2_years = round(profit_ytd + uniform(0, 500000), 2)
    profit_last_year = round(uniform(profit_last_2_years, profit_ytd), 2)
    profit_last_month = round(uniform(profit_last_year * 0.8, profit_last_year), 2)
    profit_last_week = round(uniform(profit_last_month * 0.8, profit_last_month), 2)
    market_value = round(uniform(profit_last_week * 1.2, profit_last_week * 2), 2)

    return {
        "Transaction_ID": choice(uuid_list),
        "Profit_YTD": profit_ytd,
        "Profit_Last_2_Years": profit_last_2_years,
        "Profit_Last_Year": profit_last_year,
        "Profit_Last_Month": profit_last_month,
        "Profit_Last_Week": profit_last_week,
        "Market_Value": market_value
    }

# Define variable for data generation

In [111]:
uuid_list = [fake.uuid4() for _ in range(350_000)]

# Define schema for data generation

Data type for schema

| Data type               | Value type in Python         | API to access or create a data type       |
|-------------------------|------------------------------|------------------------------------------|
| ByteType                | int or long                   | `ByteType()`                             |
| ShortType               | int or long                   | `ShortType()`                            |
| IntegerType             | int or long                   | `IntegerType()`                          |
| LongType                | long                         | `LongType()`                             |
| FloatType               | float                        | `FloatType()`                            |
| DoubleType              | float                        | `DoubleType()`                           |
| DecimalType             | decimal.Decimal               | `DecimalType()`                          |
| StringType              | string                       | `StringType()`                           |
| BinaryType              | bytearray                    | `BinaryType()`                           |
| BooleanType             | bool                         | `BooleanType()`                          |
| TimestampType           | datetime.datetime             | `TimestampType()`                        |
| TimestampNTZType        | datetime.datetime             | `TimestampNTZType()`                     |
| DateType                | datetime.date                 | `DateType()`                             |
| DayTimeIntervalType     | datetime.timedelta            | `DayTimeIntervalType()`                  |
| ArrayType               | list, tuple, or array         | `ArrayType(elementType, [containsNull])` |
| MapType                 | dict                         | `MapType(keyType, valueType, [valueContainsNull])` |
| StructType              | list or tuple                 | `StructType(fields)`                     |
| StructField             | The value type in Python of the data type of this field (e.g., Int for a StructField with the data type IntegerType) | `StructField(name, dataType, [nullable])` |
| UserDefinedType (UDT)   | User-defined                 | -                                        |
| ObjectType              | Python object                 | -                                        |
| VectorUDT               | Vector                        | -                                        |
| ImageType               | Image                         | -                                        |
| NoneType                | None                         | -                                        |


In [112]:
product_schema = StructType([
    StructField("Product_ID", StringType(), nullable=True),
    StructField("Subsector", StringType(), nullable=True),
    StructField("Category", StringType(), nullable=True),
    StructField("Brand", StringType(), nullable=True),
    StructField("Material_ID", IntegerType(), nullable=True),
    StructField("Description", StringType(), nullable=True),
    StructField("Weight", FloatType(), nullable=True),
    StructField("Shelf_Life", IntegerType(), nullable=True),
    StructField("Supplier_ID", IntegerType(), nullable=True)
])

product_suplier_schema = StructType([
    StructField("Supplier_ID", IntegerType(), True),
    StructField("Supplier_Name", StringType(), True),
    StructField("Country_of_Distribution", StringType(), True),
])

transaction_schema = StructType([
    StructField("Supplier_ID", IntegerType(), nullable=False),
    StructField("Countries_of_Sale", StringType(), nullable=False),
    StructField("Transaction_ID", StringType(), nullable=False),
    StructField("Sales_Initiation_Date", DateType(), True),
    StructField("Sales_End_Date", DateType(), True),
    StructField("Product_ID", StringType(), True),
])


financial_schema = StructType([
    StructField("Transaction_ID", StringType(), nullable=False),
    StructField("Profit_YTD", FloatType(), nullable=False),
    StructField("Profit_Last_2_Years", FloatType(), nullable=False),
    StructField("Profit_Last_Year", FloatType(), nullable=False),
    StructField("Profit_Last_Month", FloatType(), nullable=False),
    StructField("Profit_Last_Week", FloatType(), nullable=False),
    StructField("Market_Value", FloatType(), nullable=False)
])

eu_countries_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Country_Code", StringType(), True)
])

# Generate random data

### Generate randome product

In [113]:
df_product = generate_random_df(create_random_product_info, product_schema, 100_000)
df_product = df_product.drop_duplicates(['Material_ID'])
df_product = df_product.drop_duplicates(['Product_ID'])
df_product.coalesce(1).write.mode('overwrite').option("header", "true").csv("/content/product.csv")
print('count: ', df_product.count())
df_product.show()
del df_product

count:  1001
+----------+--------------------+-----------+--------+-----------+--------------------+------+----------+-----------+
|Product_ID|           Subsector|   Category|   Brand|Material_ID|         Description|Weight|Shelf_Life|Supplier_ID|
+----------+--------------------+-----------+--------+-----------+--------------------+------+----------+-----------+
|      1000|       Dessert Mixes|       Tart|  Zypher|      11828|Understand good n...|  35.0|       354|         12|
|      1001|       Dessert Mixes|        Pie|  Zypher|      13050|That religious fo...|   9.0|       278|         13|
|      1002|     Bakery Supplies|    Gumdrop|  Nexxon|      10442|Up dog gun these....|  10.0|       120|         21|
|      1003|          Sweeteners|      Creme| Vintura|      19572|Age attention awa...|  32.0|        84|         23|
|      1004|          Sweeteners|     Bakery|  Zypher|      10190|Music partner mon...|  88.0|        94|         15|
|      1005|          Sweeteners|Marshmallo

### Generate randome suplier mapping

In [114]:
suplier_data = [
    (1, 'Taylor and Sons', fake.country_code()),
    (2, 'Fischer, Harper and Collins', fake.country_code()),
    (3, 'Burns, Brewer and Valdez', fake.country_code()),
    (4, 'Li-Harris', fake.country_code()),
    (5, 'Thompson-Medina', fake.country_code()),
    (6, 'Brock Ltd', fake.country_code()),
    (7, 'Rocha-Harris', fake.country_code()),
    (8, 'Andrade, Boone and Alvarez', fake.country_code()),
    (9, 'Hurst Ltd', fake.country_code()),
    (10, 'Owens-Morrison', fake.country_code()),
    (11, 'Jones, Bradley and Davis', fake.country_code()),
    (12, 'Nichols, Mejia and Smith', fake.country_code()),
    (13, 'Miller, Jones and Moyer', fake.country_code()),
    (14, 'Good and Sons', fake.country_code()),
    (15, 'Rodgers-Johnston', fake.country_code()),
    (16, 'Potts, Allen and Jones', fake.country_code()),
    (17, 'Franklin, Weber and Parrish', fake.country_code()),
    (18, 'Carter, Martinez and Sullivan', fake.country_code()),
    (19, 'Ryan and Sons', fake.country_code()),
    (20, 'Paul, King and Nichols', fake.country_code()),
    (21, 'Marshall-Patterson', fake.country_code()),
    (22, 'Carr-Griffith', fake.country_code()),
    (23, 'Russell, Newman and Long', fake.country_code()),
    (24, 'Reed-Long', fake.country_code()),
    (25, 'Walters, Mcguire and Hampton', fake.country_code()),
    (26, 'Martinez and Sons', fake.country_code()),
    (27, 'Whitehead-Berg', fake.country_code()),
    (28, 'Ross, Hopkins and Thomas', fake.country_code()),
    (29, 'Garza-Scott', fake.country_code()),
    (30, 'Williams-Turner', fake.country_code()),
    (31, 'Taylor-Anderson', fake.country_code()),
    (32, 'Rivera-Edwards', fake.country_code()),
    (33, 'Johns Group', fake.country_code()),
    (34, 'Guzman, Page and Gregory', fake.country_code()),
    (35, 'Gordon Inc', fake.country_code())
]

df_suplier = spark.createDataFrame(suplier_data, schema=product_suplier_schema)
df_suplier.coalesce(1).write.mode('overwrite').option("header", "true").csv("/content/suplier_mapping.csv")
print('count: ', df_suplier.count())
df_suplier.show()
del df_suplier

count:  35
+-----------+--------------------+-----------------------+
|Supplier_ID|       Supplier_Name|Country_of_Distribution|
+-----------+--------------------+-----------------------+
|          1|     Taylor and Sons|                     BH|
|          2|Fischer, Harper a...|                     PG|
|          3|Burns, Brewer and...|                     SE|
|          4|           Li-Harris|                     HU|
|          5|     Thompson-Medina|                     PS|
|          6|           Brock Ltd|                     IQ|
|          7|        Rocha-Harris|                     SA|
|          8|Andrade, Boone an...|                     KP|
|          9|           Hurst Ltd|                     BR|
|         10|      Owens-Morrison|                     LY|
|         11|Jones, Bradley an...|                     CF|
|         12|Nichols, Mejia an...|                     BN|
|         13|Miller, Jones and...|                     MN|
|         14|       Good and Sons|           

### Generate randome transactions

In [115]:
df_transactions = generate_random_df(generate_random_transaction_info, transaction_schema, 300_000)
df_transactions = df_transactions.dropDuplicates(['Transaction_ID'])
df_transactions.coalesce(1).write.mode('overwrite').option("header", "true").csv("/content/transactions.csv")
print('count: ', df_transactions.count())
df_transactions.show()
del df_transactions

count:  201372
+-----------+--------------------+--------------------+---------------------+--------------+----------+
|Supplier_ID|   Countries_of_Sale|      Transaction_ID|Sales_Initiation_Date|Sales_End_Date|Product_ID|
+-----------+--------------------+--------------------+---------------------+--------------+----------+
|         35|               Haiti|00007086-7262-45c...|           2022-02-23|    2030-03-22|      1265|
|          5|             Andorra|0000b315-34e2-415...|           2027-04-12|    2027-07-13|      1795|
|         17|             Croatia|0001b030-060d-456...|           2025-01-08|          NULL|      1058|
|         19|        Sierra Leone|00029ad0-745f-45d...|           2029-04-26|    2030-07-03|      1665|
|         10|          Cape Verde|000317a9-4c78-416...|           2025-10-28|    2030-04-07|      1792|
|          2|            Botswana|0003270c-b986-4bb...|           2016-06-20|    2022-08-26|      1104|
|         35|         El Salvador|0003e80b-3f22-4

### Generate randome financial data

In [116]:
df_financial = generate_random_df(generate_financial_data_info, financial_schema, 300_000)
df_financial = df_financial.dropDuplicates(['Transaction_ID'])
df_financial.coalesce(1).write.mode('overwrite').option("header", "true").csv("/content/financial.csv")
print('count: ', df_financial.count())
df_financial.show()
del df_financial

count:  201595
+--------------------+----------+-------------------+----------------+-----------------+----------------+------------+
|      Transaction_ID|Profit_YTD|Profit_Last_2_Years|Profit_Last_Year|Profit_Last_Month|Profit_Last_Week|Market_Value|
+--------------------+----------+-------------------+----------------+-----------------+----------------+------------+
|3c1c77b9-e3d6-419...|  736166.4|          1155648.9|       1122977.1|        1016646.5|       945933.75|   1189308.0|
|94cd1c6b-d102-482...|  131614.8|           608580.0|        489201.2|        484492.06|        421488.2|    725939.3|
|5ed2087d-2ff5-45e...|  13278.79|          132120.69|        70182.12|         65641.54|        56273.33|     94767.4|
|f711baea-3a14-498...|  561936.0|           780491.0|        665421.8|        630282.44|       586620.25|   977413.56|
|1a65a089-f974-448...| 314951.38|           713500.2|       582955.25|        492789.06|       471078.78|   800038.75|
|ea0a480d-247b-446...|  494954.8|

### Generate Eu Country mapping

In [117]:
eu_country_data = [
    ("Austria", "AT"),
    ("Belgium", "BE"),
    ("Bulgaria", "BG"),
    ("Croatia", "HR"),
    ("Cyprus", "CY"),
    ("Czech Republic", "CZ"),
    ("Denmark", "DK"),
    ("Estonia", "EE"),
    ("Finland", "FI"),
    ("France", "FR"),
    ("Greece", "GR"),
    ("Spain", "ES"),
    ("Netherlands", "NL"),
    ("Ireland", "IE"),
    ("Lithuania", "LT"),
    ("Luxembourg", "LU"),
    ("Latvia", "LV"),
    ("Malta", "MT"),
    ("Germany", "DE"),
    ("Poland", "PL"),
    ("Portugal", "PT"),
    ("Romania", "RO"),
    ("Slovakia", "SK"),
    ("Slovenia", "SI"),
    ("Sweden", "SE"),
    ("Hungary", "HU"),
    ("Italy", "IT")
]

df_eu_country_mapping = spark.createDataFrame(eu_country_data, eu_countries_schema)
df_eu_country_mapping.coalesce(1).write.mode('overwrite').option("header", "true").csv("/content/eu_mapping.csv")
print('count: ', df_eu_country_mapping.count())
df_eu_country_mapping.show()
del df_eu_country_mapping

count:  27
+--------------+------------+
|       Country|Country_Code|
+--------------+------------+
|       Austria|          AT|
|       Belgium|          BE|
|      Bulgaria|          BG|
|       Croatia|          HR|
|        Cyprus|          CY|
|Czech Republic|          CZ|
|       Denmark|          DK|
|       Estonia|          EE|
|       Finland|          FI|
|        France|          FR|
|        Greece|          GR|
|         Spain|          ES|
|   Netherlands|          NL|
|       Ireland|          IE|
|     Lithuania|          LT|
|    Luxembourg|          LU|
|        Latvia|          LV|
|         Malta|          MT|
|       Germany|          DE|
|        Poland|          PL|
+--------------+------------+
only showing top 20 rows

