In [None]:
from dbldatagen import DataGenerator, PyfuncText, DateRange
from faker import Faker
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
from datetime import datetime, timedelta
from pyspark.sql.functions import expr

# Create a SparkSession
spark = SparkSession.builder.appName("DataGeneration").getOrCreate()

# Define the number of rows and partitions for data generation
partitions_requested = 2
data_rows = 10

# Define the schema for the synthetic data
schema = StructType([
    StructField("brand", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("sku_id", IntegerType(), True),
    StructField("sub_brand", StringType(), True),
    StructField("channel_name", StringType(), True),
    StructField("asin", StringType(), True),
    StructField("fsn", StringType(), True),
    StructField("return_reason_code", StringType(), True),
    StructField("return_id", IntegerType(), True),
    StructField("Return_Created_Date", TimestampType(), True),
    StructField("Return_Type", StringType(), True),
    StructField("Order_id", IntegerType(), True),
    StructField("Return_reason", StringType(), True),
    StructField("Tracking_no", StringType(), True),
    StructField("Order_Date", TimestampType(), True),
    StructField("Order_amount", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Confrimed_Date", TimestampType(), True),
    StructField("Return_Close_Date", TimestampType(), True),
    StructField("logistics_name", StringType(), True),
    StructField("refund_amount", StringType(), True),
    StructField("reverse_retum_tracking_number", StringType(), True),
    StructField("ext_return_no", StringType(), True),
    StructField("invoice_no", StringType(), True),
    StructField("fulfillment_type", StringType(), True),
    StructField("pincode", IntegerType(), True),
    StructField("platform", StringType(), True),
    StructField("label_cost", StringType(), True),
    StructField("label_type", StringType(), True),
    StructField("label_to_be_paid_by", StringType(), True),
    StructField("Return_Delivery_Date", TimestampType(), True),
    StructField("style_id", IntegerType(), True),
    StructField("EAN", StringType(), True),
    StructField("Company", StringType(), True),
    StructField("3pl_delivery_Status", StringType(), True),
    StructField("FWD_seller_Order_ID", StringType(), True),
    StructField("FWD_PO_NO", StringType(), True),
    StructField("FWD_PO_DATE", TimestampType(), True),
    StructField("FWD_B2B_INVOICE_No", StringType(), True),
    StructField("FWD_B2B_INVOICE_Date", TimestampType(), True),
    StructField("FWD_B2B_Invoice_Amt", DoubleType(), True),
    StructField("FWD_Carrier_Name", StringType(), True),
    StructField("FWD_AWB", StringType(), True),
    StructField("warehouse_id", IntegerType(), True),
    StructField("mytra_sku_code", StringType(), True),
    StructField("exchange_id", IntegerType(), True),
    StructField("store_packet_id", IntegerType(), True),
    StructField("master_bag_id", IntegerType(), True),
    StructField("Imdo_status", StringType(), True),
    StructField("Imdo_modified_last_on", TimestampType(), True),
    StructField("ShippingMethod", StringType(), True),
    StructField("ManifestByDate", TimestampType(), True),
    StructField("ManifestByCode", StringType(), True),
    StructField("Inbound_No", StringType(), True),
    StructField("GRN_No", StringType(), True),
    StructField("GRN_Date", TimestampType(), True),
    StructField("GRN_Value", StringType(), True),
    StructField("Damage_Qty", IntegerType(), True),
    StructField("item_id", IntegerType(), False),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True),
])

# Custom function to generate updated_at >= created_at
def generate_updated_at(created_at):
    max_days_difference = (datetime(2023, 12, 31) - created_at).days
    updated_at = created_at + timedelta(days=random.randint(0, max_days_difference))
    return updated_at

# Create a DataGenerator for synthetic data generation
item_data_generator = (
    DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
    .withSchema(schema)
    .withColumnSpec("brand", text=PyfuncText(lambda context, v: random.choice(["b1", "b2", "b3"])))
    .withColumnSpec("order_item_id", minValue=1, maxValue=10000, step=1)
    .withColumnSpec("sku_id", minValue=1000, maxValue=9999, step=1)
    .withColumnSpec("sub_brand", text=PyfuncText(lambda context, v: random.choice(["sb1", "sb2", "sb3"])))
    .withColumnSpec("channel_name", text=PyfuncText(lambda context, v: random.choice(["Channel1", "Channel2", "Channel3"])))
    .withColumnSpec("asin", text=PyfuncText(lambda context, v: Faker().ean13()))  # Adjust data generation as needed
    .withColumnSpec("fsn", text=PyfuncText(lambda context, v: Faker().ean8()))  # Adjust data generation as needed
    .withColumnSpec("return_reason_code", text=PyfuncText(lambda context, v: Faker().word()))  # Adjust data generation as needed
    .withColumnSpec("return_id", minValue=1000, maxValue=9999, step=1)
    .withColumnSpec("Return_Created_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("Return_Type", text=PyfuncText(lambda context, v: random.choice(["Type1", "Type2", "Type3"])))
    .withColumnSpec("Order_id", minValue=1, maxValue=10000, step=1)
    .withColumnSpec("Return_reason", text=PyfuncText(lambda context, v: random.choice(["Reason1", "Reason2", "Reason3"])))
    .withColumnSpec("Tracking_no",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("Order_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("Order_amount", random=True , text=PyfuncText(lambda context, v: str(round(random.uniform(100, 1000), 2))))
    .withColumnSpec("Status", text=PyfuncText(lambda context, v: random.choice(["Status1", "Status2", "Status3"])))
    .withColumnSpec("Category", text=PyfuncText(lambda context, v: random.choice(["Category1", "Category2", "Category3"])))
    .withColumnSpec("Confrimed_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("Return_Close_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("logistics_name", text=PyfuncText(lambda context, v: random.choice(["Logistics1", "Logistics2", "Logistics3"])))
    .withColumnSpec("refund_amount", random=True , text=PyfuncText(lambda context, v: str(round(random.uniform(100, 1000), 2))))
    .withColumnSpec("reverse_retum_tracking_number",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("ext_return_no",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("invoice_no", text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("fulfillment_type", text=PyfuncText(lambda context, v: random.choice(["Fulfillment1", "Fulfillment2", "Fulfillment3"])))
    .withColumnSpec("pincode", minValue=100000, maxValue=999999, step=1)
    .withColumnSpec("platform", text=PyfuncText(lambda context, v: random.choice(["Platform1", "Platform2", "Platform3"])))
    .withColumnSpec("label_cost", random=True , text=PyfuncText(lambda context, v: str(round(random.uniform(100, 1000), 2))))
    .withColumnSpec("label_type", text=PyfuncText(lambda context, v: random.choice(["LabelType1", "LabelType2", "LabelType3"])))
    .withColumnSpec("label_to_be_paid_by", text=PyfuncText(lambda context, v: random.choice(["PaidBy1", "PaidBy2", "PaidBy3"])))
    .withColumnSpec("Return_Delivery_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("style_id", minValue=1000, maxValue=9999, step=1)
    .withColumnSpec("EAN",  text=PyfuncText(lambda context, v: Faker().ean13()))
    .withColumnSpec("Company", text=PyfuncText(lambda context, v: random.choice(["Company1", "Company2", "Company3"])))
    .withColumnSpec("3pl_delivery_Status", text=PyfuncText(lambda context, v: random.choice(["Status1", "Status2", "Status3"])))
    .withColumnSpec("FWD_seller_Order_ID",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("FWD_PO_NO",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("FWD_PO_DATE", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("FWD_B2B_INVOICE_No",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("FWD_B2B_INVOICE_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("FWD_B2B_Invoice_Amt", random=True , text=PyfuncText(lambda context, v: str(round(random.uniform(100, 1000), 2))))
    .withColumnSpec("FWD_Carrier_Name", text=PyfuncText(lambda context, v: random.choice(["Carrier1", "Carrier2", "Carrier3"])))
    .withColumnSpec("FWD_AWB",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("warehouse_id", minValue=1, maxValue=10, step=1)
    .withColumnSpec("mytra_sku_code",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("exchange_id", minValue=1, maxValue=10, step=1)
    .withColumnSpec("store_packet_id", minValue=1, maxValue=10, step=1)
    .withColumnSpec("master_bag_id", minValue=1, maxValue=10, step=1)
    .withColumnSpec("Imdo_status", text=PyfuncText(lambda context, v: random.choice(["Status1", "Status2", "Status3"])))
    .withColumnSpec("Imdo_modified_last_on", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("ShippingMethod", text=PyfuncText(lambda context, v: random.choice(["ShippingMethod1", "ShippingMethod2", "ShippingMethod3"])))
    .withColumnSpec("ManifestByDate", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("ManifestByCode", text=PyfuncText(lambda context, v: random.choice(["Code1", "Code2", "Code3"])))
    .withColumnSpec("Inbound_No",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("GRN_No",  text=PyfuncText(lambda context, v: Faker().word()))
    .withColumnSpec("GRN_Date", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("GRN_Value", random=True , text=PyfuncText(lambda context, v: str(round(random.uniform(100, 1000), 2))))
    .withColumnSpec("Damage_Qty", minValue=1, maxValue=100, step=1)
    .withColumnSpec("item_id", minValue=1, maxValue=5000, step=1)
    .withColumnSpec("created_at", "timestamp", data_range=DateRange("2010-01-01 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
    .withColumnSpec("updated_at", "timestamp", data_range=DateRange("2010-01-02 00:00:00", "2023-12-31 23:59:59", "days=1"), random=True)
)

# Build the synthetic data DataFrame
item_data = item_data_generator.build()

# Display the first 10 rows of the generated data
display(item_data.limit(10))


brand,order_item_id,sku_id,sub_brand,channel_name,asin,fsn,return_reason_code,return_id,Return_Created_Date,Return_Type,Order_id,Return_reason,Tracking_no,Order_Date,Order_amount,Status,Category,Confrimed_Date,Return_Close_Date,logistics_name,refund_amount,reverse_retum_tracking_number,ext_return_no,invoice_no,fulfillment_type,pincode,platform,label_cost,label_type,label_to_be_paid_by,Return_Delivery_Date,style_id,EAN,Company,3pl_delivery_Status,FWD_seller_Order_ID,FWD_PO_NO,FWD_PO_DATE,FWD_B2B_INVOICE_No,FWD_B2B_INVOICE_Date,FWD_B2B_Invoice_Amt,FWD_Carrier_Name,FWD_AWB,warehouse_id,mytra_sku_code,exchange_id,store_packet_id,master_bag_id,Imdo_status,Imdo_modified_last_on,ShippingMethod,ManifestByDate,ManifestByCode,Inbound_No,GRN_No,GRN_Date,GRN_Value,Damage_Qty,item_id,created_at,updated_at
b2,1,1000,sb1,Channel2,1296604668039,25640724,but,1000,2021-12-05T00:00:00.000+0000,Type3,1,Reason3,long,2015-06-06T00:00:00.000+0000,222.24,Status3,Category2,2018-11-11T00:00:00.000+0000,2013-04-25T00:00:00.000+0000,Logistics1,448.6,seem,those,cultural,Fulfillment3,100000,Platform3,201.74,LabelType1,PaidBy3,2012-11-04T00:00:00.000+0000,1000,8189221459117,Company1,Status3,security,create,2021-02-06T00:00:00.000+0000,century,2022-07-23T00:00:00.000+0000,0.0,Carrier3,cell,1,above,1,1,1,Status1,2010-04-27T00:00:00.000+0000,ShippingMethod3,2011-01-31T00:00:00.000+0000,Code3,boy,treatment,2012-03-27T00:00:00.000+0000,611.52,1,1,2012-03-20T00:00:00.000+0000,2013-07-28T00:00:00.000+0000
b1,2,1001,sb2,Channel1,1602778026603,1768428,matter,1001,2016-12-03T00:00:00.000+0000,Type1,2,Reason3,huge,2014-08-10T00:00:00.000+0000,669.51,Status3,Category1,2016-09-06T00:00:00.000+0000,2015-08-31T00:00:00.000+0000,Logistics2,504.55,church,in,cover,Fulfillment2,100001,Platform2,373.16,LabelType1,PaidBy3,2019-08-06T00:00:00.000+0000,1001,2694254727509,Company3,Status3,why,economic,2015-06-29T00:00:00.000+0000,place,2022-05-30T00:00:00.000+0000,1.0,Carrier2,discuss,2,prevent,2,2,2,Status3,2019-11-05T00:00:00.000+0000,ShippingMethod2,2018-02-04T00:00:00.000+0000,Code3,old,among,2022-09-02T00:00:00.000+0000,623.47,2,2,2012-12-10T00:00:00.000+0000,2013-07-06T00:00:00.000+0000
b2,3,1002,sb3,Channel1,4258314426283,93801324,arrive,1002,2020-11-24T00:00:00.000+0000,Type2,3,Reason3,his,2023-05-11T00:00:00.000+0000,805.73,Status1,Category2,2019-07-23T00:00:00.000+0000,2018-10-22T00:00:00.000+0000,Logistics2,888.24,collection,story,collection,Fulfillment1,100002,Platform1,641.09,LabelType3,PaidBy3,2023-09-19T00:00:00.000+0000,1002,8483929946963,Company3,Status1,on,want,2019-07-19T00:00:00.000+0000,establish,2016-04-27T00:00:00.000+0000,2.0,Carrier1,anything,3,region,3,3,3,Status1,2018-08-28T00:00:00.000+0000,ShippingMethod2,2010-09-10T00:00:00.000+0000,Code3,economic,music,2016-10-11T00:00:00.000+0000,598.51,3,3,2021-10-09T00:00:00.000+0000,2012-08-05T00:00:00.000+0000
b1,4,1003,sb1,Channel1,957039930658,8462633,value,1003,2017-06-04T00:00:00.000+0000,Type1,4,Reason3,bit,2017-07-09T00:00:00.000+0000,650.47,Status1,Category3,2022-08-07T00:00:00.000+0000,2011-01-22T00:00:00.000+0000,Logistics2,388.2,administration,activity,full,Fulfillment3,100003,Platform3,692.21,LabelType1,PaidBy3,2015-07-03T00:00:00.000+0000,1003,2751107383715,Company1,Status3,short,card,2023-02-17T00:00:00.000+0000,consider,2011-01-28T00:00:00.000+0000,3.0,Carrier1,peace,4,ever,4,4,4,Status1,2020-09-01T00:00:00.000+0000,ShippingMethod3,2016-03-01T00:00:00.000+0000,Code3,condition,town,2018-12-22T00:00:00.000+0000,206.69,4,4,2021-02-12T00:00:00.000+0000,2012-11-29T00:00:00.000+0000
b1,5,1004,sb2,Channel1,4835999039191,57955278,per,1004,2016-07-11T00:00:00.000+0000,Type3,5,Reason3,capital,2022-12-13T00:00:00.000+0000,203.51,Status2,Category2,2018-06-29T00:00:00.000+0000,2012-07-22T00:00:00.000+0000,Logistics1,438.31,recognize,dinner,ability,Fulfillment2,100004,Platform1,414.56,LabelType2,PaidBy1,2013-05-30T00:00:00.000+0000,1004,6492541380019,Company2,Status3,win,people,2019-06-14T00:00:00.000+0000,along,2010-09-15T00:00:00.000+0000,4.0,Carrier3,modern,5,forget,5,5,5,Status3,2012-12-09T00:00:00.000+0000,ShippingMethod2,2010-05-30T00:00:00.000+0000,Code2,machine,cell,2010-02-16T00:00:00.000+0000,202.95,5,5,2010-02-09T00:00:00.000+0000,2022-12-05T00:00:00.000+0000
b2,6,1005,sb3,Channel1,473187428224,32308389,relationship,1005,2015-01-03T00:00:00.000+0000,Type2,6,Reason1,best,2011-05-11T00:00:00.000+0000,357.34,Status3,Category1,2019-04-20T00:00:00.000+0000,2023-08-10T00:00:00.000+0000,Logistics1,932.35,language,board,account,Fulfillment3,100005,Platform3,521.63,LabelType1,PaidBy1,2016-10-27T00:00:00.000+0000,1005,3419051919426,Company3,Status1,effort,head,2017-08-17T00:00:00.000+0000,character,2018-09-25T00:00:00.000+0000,5.0,Carrier2,process,6,scene,6,6,6,Status3,2018-08-05T00:00:00.000+0000,ShippingMethod1,2015-03-08T00:00:00.000+0000,Code3,attorney,along,2020-08-01T00:00:00.000+0000,106.78,6,6,2019-10-02T00:00:00.000+0000,2015-02-14T00:00:00.000+0000
b1,7,1006,sb2,Channel1,1062929987625,31446945,budget,1006,2015-12-18T00:00:00.000+0000,Type2,7,Reason1,fill,2014-01-04T00:00:00.000+0000,694.13,Status2,Category1,2019-01-29T00:00:00.000+0000,2013-01-28T00:00:00.000+0000,Logistics3,949.61,development,each,support,Fulfillment2,100006,Platform1,921.75,LabelType2,PaidBy1,2022-06-20T00:00:00.000+0000,1006,589512059381,Company3,Status3,lot,sort,2016-12-09T00:00:00.000+0000,send,2022-12-14T00:00:00.000+0000,6.0,Carrier2,smile,7,not,7,7,7,Status3,2023-09-13T00:00:00.000+0000,ShippingMethod3,2023-08-18T00:00:00.000+0000,Code3,allow,pretty,2016-07-06T00:00:00.000+0000,284.96,7,7,2017-02-27T00:00:00.000+0000,2017-11-02T00:00:00.000+0000
b1,8,1007,sb1,Channel3,5461875638128,2909837,recently,1007,2013-11-19T00:00:00.000+0000,Type1,8,Reason2,close,2011-11-26T00:00:00.000+0000,824.7,Status2,Category2,2023-03-30T00:00:00.000+0000,2023-04-15T00:00:00.000+0000,Logistics1,738.2,growth,including,measure,Fulfillment1,100007,Platform2,243.6,LabelType2,PaidBy1,2012-05-08T00:00:00.000+0000,1007,6174743176224,Company1,Status1,will,view,2019-10-14T00:00:00.000+0000,at,2018-03-11T00:00:00.000+0000,7.0,Carrier2,whatever,8,trip,8,8,8,Status1,2011-05-18T00:00:00.000+0000,ShippingMethod2,2022-06-02T00:00:00.000+0000,Code2,upon,pressure,2011-03-24T00:00:00.000+0000,435.52,8,8,2019-11-16T00:00:00.000+0000,2022-07-21T00:00:00.000+0000
b1,9,1008,sb3,Channel2,9888400137592,67157181,positive,1008,2015-12-24T00:00:00.000+0000,Type2,9,Reason2,society,2012-06-18T00:00:00.000+0000,942.21,Status1,Category3,2020-07-06T00:00:00.000+0000,2013-07-29T00:00:00.000+0000,Logistics2,683.0,wait,night,from,Fulfillment2,100008,Platform3,799.16,LabelType1,PaidBy1,2012-02-07T00:00:00.000+0000,1008,1582470811434,Company2,Status2,there,center,2021-10-30T00:00:00.000+0000,interest,2016-07-18T00:00:00.000+0000,8.0,Carrier3,often,9,manager,9,9,9,Status3,2022-05-06T00:00:00.000+0000,ShippingMethod2,2019-10-07T00:00:00.000+0000,Code3,owner,difficult,2015-12-18T00:00:00.000+0000,182.13,9,9,2012-06-15T00:00:00.000+0000,2010-05-28T00:00:00.000+0000
b2,10,1009,sb1,Channel2,6096264228278,87683455,want,1009,2014-09-12T00:00:00.000+0000,Type1,10,Reason3,finish,2018-10-07T00:00:00.000+0000,731.14,Status3,Category3,2012-02-27T00:00:00.000+0000,2014-07-29T00:00:00.000+0000,Logistics1,613.71,employee,choose,foot,Fulfillment3,100009,Platform1,750.97,LabelType1,PaidBy1,2012-10-06T00:00:00.000+0000,1009,5352371283595,Company1,Status3,modern,name,2016-02-04T00:00:00.000+0000,room,2013-04-05T00:00:00.000+0000,9.0,Carrier2,certainly,10,city,10,10,10,Status3,2012-06-06T00:00:00.000+0000,ShippingMethod2,2020-11-21T00:00:00.000+0000,Code1,rock,everybody,2020-01-14T00:00:00.000+0000,821.68,10,10,2011-12-22T00:00:00.000+0000,2018-02-06T00:00:00.000+0000
