In [None]:
%pip install faker

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Collecting faker
  Obtaining dependency information for faker from https://files.pythonhosted.org/packages/d6/a9/3bdbd257f7aa3cb971bbf8c688827532ecfe6448168d211cb63b942f6431/Faker-28.4.1-py3-none-any.whl.metadata
  Using cached Faker-28.4.1-py3-none-any.whl.metadata (15 kB)
Using cached Faker-28.4.1-py3-none-any.whl (1.8 MB)
Installing collected packages: faker
Successfully installed faker-28.4.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col, expr
from pyspark.sql.types import StringType, DateType
# from faker import Faker
import pandas as pd

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("GenerateFakeData") \
    .config("spark.sql.shuffle.partitions", "500") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "16") \
    .getOrCreate()

In [None]:

fake = Faker()

# UDFs for generating fake data
def generate_first_name():
    return fake.first_name()

def generate_last_name():
    return fake.last_name()

def generate_address():
    return fake.address()

def generate_dob():
    return fake.date_of_birth(minimum_age=18, maximum_age=90).strftime("%Y-%m-%d")

first_name_udf = udf(generate_first_name, StringType())
last_name_udf = udf(generate_last_name, StringType())
address_udf = udf(generate_address, StringType())
dob_udf = udf(generate_dob, StringType())

In [None]:
first_name_udf = udf(generate_first_name, StringType())
last_name_udf = udf(generate_last_name, StringType())
address_udf = udf(generate_address, StringType())
dob_udf = udf(generate_dob, StringType())

# Create a DataFrame with the required number of rows
num_records = 100  # Adjust based on the size needed
df = spark.range(num_records) \
    .select(
        first_name_udf().alias("first_name"),
        last_name_udf().alias("last_name"),
        address_udf().alias("address"),
        dob_udf().alias("dob")
    )



In [None]:
# Repartition to balance the load
df = df.repartition(100)  # Increase partitions if necessary

# Write the DataFrame to CSV
output_path = "dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv"  # Use DBFS path for Databricks
df.write.csv(output_path, header=True, mode="overwrite")

print(f"Data saved at: {output_path}")


Data saved at: dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv


In [None]:
# Path to the CSV file in DBFS


# Read the CSV file from DBFS
df = spark.read.csv(output_path, header=True, inferSchema=True)

# Define the masking functions
def mask_name(name):
    return name[0] + "*" * (len(name) - 1)  # Mask all but the first character

def mask_address(address):
    return "Address Masked"

# Register UDFs for masking
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

mask_name_udf = udf(mask_name, StringType())
mask_address_udf = udf(mask_address, StringType())

# Apply masking to the DataFrame
masked_df = df.withColumn("first_name", mask_name_udf(col("first_name"))) \
              .withColumn("last_name", mask_name_udf(col("last_name"))) \
              .withColumn("address", mask_address_udf(col("address")))
masked_df.show(50)


+------------------+----------+--------------+----+
|        first_name| last_name|       address| dob|
+------------------+----------+--------------+----+
|             J****|   M******|Address Masked|NULL|
|  E***************| *********|Address Masked|NULL|
|             A****|     S****|Address Masked|NULL|
|   N**************| *********|Address Masked|NULL|
|            M*****|     M****|Address Masked|NULL|
|         P********| *********|Address Masked|NULL|
|           K******|   M******|Address Masked|NULL|
|       P**********| *********|Address Masked|NULL|
|          K*******|    K*****|Address Masked|NULL|
|           A******| *********|Address Masked|NULL|
|            L*****|   L******|Address Masked|NULL|
|         A********| *********|Address Masked|NULL|
|            J*****|  C*******|Address Masked|NULL|
|        W*********| *********|Address Masked|NULL|
|            S*****|       K**|Address Masked|NULL|
|       E**********| *********|Address Masked|NULL|
|           

In [None]:
# List the files in a directory and their sizes
dbutils.fs.ls(output_path)


[FileInfo(path='dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1725546055000),
 FileInfo(path='dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv/_committed_9154164020424405091', name='_committed_9154164020424405091', size=8836, modificationTime=1725546055000),
 FileInfo(path='dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv/_started_9154164020424405091', name='_started_9154164020424405091', size=0, modificationTime=1725546040000),
 FileInfo(path='dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv/part-00000-tid-9154164020424405091-3fa568e6-f0eb-4c01-95be-e2b0fbfebdcf-12-1-c000.csv', name='part-00000-tid-9154164020424405091-3fa56

In [None]:
df2 = spark.read.csv("dbfs:/dbfs/Workspace/Users/kratikamahale_outlook.com#ext#@kratikamahaleoutlook.onmicrosoft.com/databricks_training/output2.csv", header=True, inferSchema=True)
df2 = df2.coalesce(1)

In [None]:
# Write to local file system (on the driver)
df2.write.csv("file:/tmp/local_csv_files/", header=True)


In [None]:
# Copy the files from the local file system to DBFS (optional)
dbutils.fs.cp("file:/tmp/local_csv_files/", "dbfs:/tmp/local_csv_files/", recurse=True)

# Use Databricks UI to download the files from the DBFS path


True

In [None]:
# Display the content of a file
dbutils.fs.head("dbfs:/tmp/local_csv_files/part-00000-tid-2404114766856675547-a484fcaf-3fc2-4cd6-8af7-86b27b9cc091-210-1-c000.csv")


[Truncated to first 65536 bytes]


'first_name,last_name,address,dob\nJulia,Manning,"1658 Ethan Inlet Apt. 420",\nEast Miguelmouth,"DC 46454\\"",1987-12-18,\nAaron,Smith,"50606 Brown Mill Apt. 796",\nNorth Susanside,"FM 07826\\"",1933-09-14,\nMorgan,Marks,"7576 Carlos Wall",\nPort Anna,"MI 05114\\"",1997-06-09,\nKristen,Mueller,"37272 Simmons Mountain",\nPearsonfort,"IL 92091\\"",1990-12-08,\nKristina,Kramer,"92762 Debbie Plains",\nAmyberg,"NJ 19321\\"",1991-03-28,\nLauren,Lindsey,"57542 Vincent Cove Suite 735",\nAllenstad,"SD 93518\\"",1991-12-28,\nJustin,Campbell,"4972 Michael Forge",\nWest Jacob,"NM 20089\\"",1954-05-13,\nSamuel,Kim,"95149 Lori Falls Apt. 880",\nEast Angela,"IL 09757\\"",1965-07-23,\nRachel,Santana,"655 May Heights",\nEast Jessicabury,"SC 25375\\"",1999-06-19,\nMargaret,Wilcox,"84418 Marshall Shores Apt. 205",\nNorth Patriciatown,"SC 04210\\"",1994-01-26,\nRose,Herman,"93806 King Plains Apt. 232",\nWest Tina,"KY 96678\\"",2000-06-27,\nScott,Gonzales,"621 Kathy Ridges Suite 388",\nManningbury,"CT 2256