# Data Processing Problem 2


## Author :- Anuja Jain 
## Date :- 22 Apr 2021 
## Distributed Platform :-  PySpark

### Import all the required libraries

In [1]:
import os
import sys
import findspark
import time

# Initialize Findspark to locate Pyspark installed in the system
findspark.init()

# Import Pyspark libraries and functions
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat, col, lit, udf, when
from pyspark.sql.types import ArrayType, IntegerType, StringType , DateType

# Import Faker (3rd party library) used to generate anonymized data
from faker import Factory


### Setup Spark Context and set the master as local for standalone cluster

In [2]:
sc = SparkContext(conf=SparkConf().setAppName("LFApp").setMaster("local"))
sqlc=SQLContext(sc) 

### Read the employee data  (Downloaded from : http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/ )

In [3]:
df = sqlc.read.csv('./Dataset/Records5m.csv', header = True)

In [4]:
# Find the number of rows
df.count()

5000000

In [5]:
# Dataset columns
df.columns

['Emp ID',
 'Name Prefix',
 'First Name',
 'Middle Initial',
 'Last Name',
 'Gender',
 'E Mail',
 "Father's Name",
 "Mother's Name",
 "Mother's Maiden Name",
 'Date of Birth',
 'Time of Birth',
 'Age in Yrs.',
 'Weight in Kgs.',
 'Date of Joining',
 'Quarter of Joining',
 'Half of Joining',
 'Year of Joining',
 'Month of Joining',
 'Month Name of Joining',
 'Short Month',
 'Day of Joining',
 'DOW of Joining',
 'Short DOW',
 'Age in Company (Years)',
 'Salary',
 'Last % Hike',
 'SSN',
 'Phone No. ',
 'Place Name',
 'County',
 'City',
 'State',
 'Zip',
 'Region',
 'User Name',
 'Password']

In [6]:
# Showcasing first few rows 
df.show(5)

+------+-----------+----------+--------------+---------+------+--------------------+---------------+------------------+--------------------+-------------+-------------+-----------+--------------+---------------+------------------+---------------+---------------+----------------+---------------------+-----------+--------------+--------------+---------+----------------------+------+-----------+-----------+------------+----------+--------------------+----------+-----+-----+-------+----------+--------------+
|Emp ID|Name Prefix|First Name|Middle Initial|Last Name|Gender|              E Mail|  Father's Name|     Mother's Name|Mother's Maiden Name|Date of Birth|Time of Birth|Age in Yrs.|Weight in Kgs.|Date of Joining|Quarter of Joining|Half of Joining|Year of Joining|Month of Joining|Month Name of Joining|Short Month|Day of Joining|DOW of Joining|Short DOW|Age in Company (Years)|Salary|Last % Hike|        SSN|  Phone No. |Place Name|              County|      City|State|  Zip| Region| User N

### Generate an address column by combining Place, country, city, state and zip 

In [7]:
df = df.withColumn('Address', concat(col('Place Name'), lit(', '), col('County'),
                                     col('City'), lit(', '),col('State'),lit(', '),col('Zip')))

### select only required columns

In [8]:
df_selc = df.select('Emp ID','First Name','Last Name','Date of Birth','Address', 'Gender')

In [9]:
# Finding the size of the data in memory
subset_size = sys.getsizeof(df_selc.limit(100).toPandas())
nrows = df_selc.count()
print("size in memory: {:.3f} GB".format(subset_size/100*nrows/1E9))

size in memory: 2.044 GB


In [10]:
df_selc.show(10)

+------+----------+---------+-------------+--------------------+------+
|Emp ID|First Name|Last Name|Date of Birth|             Address|Gender|
+------+----------+---------+-------------+--------------------+------+
|742048|    Lizeth|   Mccoll|     7/5/1973|Alliance, StarkAl...|     F|
|671135| Argentina|     Hern|   10/15/1962|Washington, Distr...|     F|
|965851|    Damian|  Patillo|    3/11/1975|Burrel, FresnoBur...|     M|
|224660|   Imogene| Hagopian|    2/19/1995|Bryan, BrazosBrya...|     F|
|852694|    Walker|  Wallach|     8/5/1992|Newllano, VernonN...|     M|
|144102|  Jesusita|   Hollie|   10/30/1987|Topanga, Los Ange...|     F|
|687793|    Fausto|  Esqueda|    4/23/1971|Somerdale, Tuscar...|     M|
|636308|     Vanda|    Komar|     9/2/1962|Akron, SummitAkro...|     F|
|218660|   Destiny|Nicholson|     7/1/1995|Atlantic City, At...|     F|
|465691|      Evie|    Hamby|    6/22/1980|Rockaway, MorrisR...|     F|
+------+----------+---------+-------------+--------------------+

### Create Pyspark UDF for anonymizing required columns by generating similar fake columns . 
####     Here , we filter out male and female employees and anonymize them seperately to maintain the gender of the fake name

In [11]:
#pyspark UDF
faker  = Factory.create()
anonymize_male = udf(lambda n : faker.first_name_male(), StringType())
anonymize_female = udf(lambda n : faker.first_name_female(), StringType())
anonymize_female_lst = udf(lambda n : faker.last_name_female(), StringType())
anonymize_male_lst = udf(lambda n : faker.last_name_male(), StringType())
anonymize_addr = udf(lambda n: faker.address(), StringType())

In [12]:
# Create a new dataframe that has both the original and fake columns for comparison
start = time.time()

df_fake = df_selc.withColumn("Fake_First_name", when(col("Gender") == "M",anonymize_male(col("First Name")))
      .when(col("Gender") == "F",anonymize_female(col("First Name")))) \
     .withColumn("Fake_Last_name", when(col("Gender") == "M",anonymize_male_lst(col("Last Name")))
      .when(col("Gender") == "F",anonymize_female_lst(col("First Name")))) \
    .withColumn("Fake Addrs", anonymize_addr(col("Address")))
end = time.time()
print ("Total Processing Time (in seconds) to anonymize First name, Last Name and address for 2GB data: ",end-start)

Total Processing Time (in seconds) to anonymize First name, Last Name and address for 2GB data:  0.18032479286193848


In [13]:
df_fake.show(10)

+------+----------+---------+-------------+--------------------+------+---------------+--------------+--------------------+
|Emp ID|First Name|Last Name|Date of Birth|             Address|Gender|Fake_First_name|Fake_Last_name|          Fake Addrs|
+------+----------+---------+-------------+--------------------+------+---------------+--------------+--------------------+
|742048|    Lizeth|   Mccoll|     7/5/1973|Alliance, StarkAl...|     F|          Erica|        Morris|0413 Laura Island...|
|671135| Argentina|     Hern|   10/15/1962|Washington, Distr...|     F|        Darlene|          King|222 Smith Land
Ea...|
|965851|    Damian|  Patillo|    3/11/1975|Burrel, FresnoBur...|     M|         Olivia|          Wolf|561 White Plain S...|
|224660|   Imogene| Hagopian|    2/19/1995|Bryan, BrazosBrya...|     F|         Amanda|       Johnson|7983 Dylan Groves...|
|852694|    Walker|  Wallach|     8/5/1992|Newllano, VernonN...|     M|        Rebecca|       Salinas|66816 Tammy Avenu...|
|144102|

## Select the required columns and rename them as original columns to maintain anonymity

In [14]:
df_final = df_fake.select('Emp ID','Fake_First_name','Fake_Last_name','Date of Birth','Fake Addrs')

In [15]:
df_final = df_final.withColumnRenamed('Fake_First_name','First Name').withColumnRenamed('Fake_Last_name','Last Name') \
            .withColumnRenamed('Fake Addrs','Address')

In [16]:
df_final.show(10)

+------+----------+---------+-------------+--------------------+
|Emp ID|First Name|Last Name|Date of Birth|             Address|
+------+----------+---------+-------------+--------------------+
|742048|     Erica|   Morris|     7/5/1973|0413 Laura Island...|
|671135|   Darlene|     King|   10/15/1962|222 Smith Land
Ea...|
|965851|    Olivia|     Wolf|    3/11/1975|561 White Plain S...|
|224660|    Amanda|  Johnson|    2/19/1995|7983 Dylan Groves...|
|852694|   Rebecca|  Salinas|     8/5/1992|66816 Tammy Avenu...|
|144102|  Michelle| Mckinney|   10/30/1987|197 Wu Gateway Ap...|
|687793|    Andrea|    Jones|    4/23/1971|7643 Cassandra In...|
|636308|    Cheryl|      Lee|     9/2/1962|15012 Nancy Stree...|
|218660|   Rebecca|    Hogan|     7/1/1995|63521 Lyons Villa...|
|465691|     Emily|  Stewart|    6/22/1980|417 Myers Hollow
...|
+------+----------+---------+-------------+--------------------+
only showing top 10 rows

