In [0]:
%pip install faker

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
from pyspark.sql.datasource import InputPartition

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def partitions(self):
        return [RangePartition(1, 10000),
                RangePartition(10001, 20000),
                RangePartition(20001, 30000),
                RangePartition(30001, 40000),
                 RangePartition(40001, 60000),
                ]

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        import random
        import datetime
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        # for client in clients:
        card_operations = []    
        num_transactions = random.randint(partition.start, partition.end)
        for _ in range(num_transactions):
            transaction_id = fake.unique.uuid4()
            start_date = datetime.datetime.strptime("2000-01-01", '%Y-%m-%d')
            client_id = random.randint(1, 2000)
            tran_date = fake.date_between(
                start_date=start_date,
                end_date=datetime.timedelta(days=1)
                )
            operation = ( 
                str(transaction_id),
                str(client_id),
                str(fake.random_int(min=1, max=100000)/10),
                tran_date.strftime('%Y-%m-%d'),
                fake.company(),
                random.choice(
                    ["approved", "approved","approved","approved","approved","approved","approved","approved","approved","approved","declined"]
                    )
            )
            yield  operation

In [0]:
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "transaction_id string, card_id string, transaction_amount string, transaction_date string, merchant string, status string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

In [0]:
spark.dataSource.register(FakeDataSource)

In [0]:
from datetime import datetime

df = spark.read.format("fake").load()
ts = datetime.now().strftime("%Y%m%d%H%M%S")
df.write.mode("append").parquet(f"/Volumes/sandbox/bronze/landing/{ts}")