# Preprocessing using pyspark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz # download spark 3.1.1
!tar xf spark-3.1.1-bin-hadoop2.7.tgz # unzip it
!pip install -q findspark # install findspark
!pip install pyspark==3.1.1 # install pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession



In [2]:
spark = SparkSession.builder.master("local").config(conf = SparkConf()).getOrCreate()
spark

### Read data and perform basic operations

In [3]:
df = spark.read.csv(path = "/content/employee.txt", header = True, inferSchema = True)
df

DataFrame[id: int, last_name: string, email: string, gender: string, department: string, start_date: string, salary: int, job_title: string, region_id: int]

In [4]:
df.schema

StructType(List(StructField(id,IntegerType,true),StructField(last_name,StringType,true),StructField(email,StringType,true),StructField(gender,StringType,true),StructField(department,StringType,true),StructField(start_date,StringType,true),StructField(salary,IntegerType,true),StructField(job_title,StringType,true),StructField(region_id,IntegerType,true)))

In [5]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: integer (nullable = true)



In [6]:
df.columns

['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [7]:
df.take(5)

[Row(id=1, last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary=67470, job_title="'Structural Engineer'", region_id=2),
 Row(id=2, last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary=71869, job_title="'Financial Advisor'", region_id=2),
 Row(id=3, last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary=101768, job_title="'Recruiting Manager'", region_id=3),
 Row(id=4, last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary=96897, job_title="'Desktop Support Technician'", region_id=3),
 Row(id=5, last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary=63702, job_title="'Software Engineer III'", region_id=7

In [8]:
type(df.take(5)[0])

In [9]:
df.head(5)

[Row(id=1, last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary=67470, job_title="'Structural Engineer'", region_id=2),
 Row(id=2, last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary=71869, job_title="'Financial Advisor'", region_id=2),
 Row(id=3, last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary=101768, job_title="'Recruiting Manager'", region_id=3),
 Row(id=4, last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary=96897, job_title="'Desktop Support Technician'", region_id=3),
 Row(id=5, last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary=63702, job_title="'Software Engineer III'", region_id=7

In [10]:
df.tail(5)

[Row(id=996, last_name="'James'", email="'tjamesrn@soundcloud.com'", gender="'Female'", department="'Games'", start_date="'11/17/2013'", salary=78433, job_title="'Accountant II'", region_id=7),
 Row(id=997, last_name="'Reynolds'", email="'dreynoldsro@blogtalkradio.com'", gender="'Female'", department="'Computers'", start_date="'4/16/2007'", salary=120138, job_title="'Statistician IV'", region_id=1),
 Row(id=998, last_name="'Walker'", email="'kwalkerrp@unicef.org'", gender="'Female'", department="'Games'", start_date="'2/13/2010'", salary=60363, job_title="'Account Coordinator'", region_id=1),
 Row(id=999, last_name="'Kennedy'", email="'lkennedyrq@edublogs.org'", gender="'Male'", department="'Industrial'", start_date="'9/22/2004'", salary=48050, job_title="'Graphic Designer'", region_id=2),
 Row(id=1000, last_name="'Howard'", email="'showardrr@addtoany.com'", gender="'Male'", department="'Baby'", start_date="'11/7/2003'", salary=148687, job_title="'General Manager'", region_id=3)]

In [11]:
df.count()

1000

In [12]:
df.describe().show()

+-------+-----------------+---------+--------------------+--------+------------+----------+------------------+--------------------+-----------------+
|summary|               id|last_name|               email|  gender|  department|start_date|            salary|           job_title|        region_id|
+-------+-----------------+---------+--------------------+--------+------------+----------+------------------+--------------------+-----------------+
|  count|             1000|     1000|                1000|    1000|        1000|      1000|              1000|                1000|             1000|
|   mean|            500.5|     null|                null|    null|        null|      null|         97331.223|                null|            3.971|
| stddev|288.8194360957494|     null|                null|    null|        null|      null|31753.109191299463|                null|2.017977014544822|
|    min|                1|  'Adams'|'aalexandero8@his...|'Female'|'Automotive'|'1/1/2004'|         

In [13]:
df.select('id', 'last_name', 'salary').describe().show()

+-------+-----------------+---------+------------------+
|summary|               id|last_name|            salary|
+-------+-----------------+---------+------------------+
|  count|             1000|     1000|              1000|
|   mean|            500.5|     null|         97331.223|
| stddev|288.8194360957494|     null|31753.109191299463|
|    min|                1|  'Adams'|             40138|
|    max|             1000|  'Young'|            149929|
+-------+-----------------+---------+------------------+



In [14]:
df_sample = df.sample(withReplacement = False, fraction = 0.1, seed = 0)
df_sample

DataFrame[id: int, last_name: string, email: string, gender: string, department: string, start_date: string, salary: int, job_title: string, region_id: int]

In [15]:
df_sample.count()

103

In [16]:
df_sample.head(5)

[Row(id=3, last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary=101768, job_title="'Recruiting Manager'", region_id=3),
 Row(id=10, last_name="'Sanchez'", email="'rsanchez9@cloudflare.com'", gender="'Male'", department="'Movies'", start_date="'3/13/2013'", salary=108093, job_title="'Sales Representative'", region_id=1),
 Row(id=17, last_name="'Morgan'", email="'dmorgang@123-reg.co.uk'", gender="'Female'", department="'Kids'", start_date="'5/4/2011'", salary=148952, job_title="'Programmer IV'", region_id=6),
 Row(id=27, last_name="'Owens'", email="'cowensq@shareasale.com'", gender="'Female'", department="'Home'", start_date="'5/26/2005'", salary=78698, job_title="'Executive Secretary'", region_id=2),
 Row(id=47, last_name="'Oliver'", email="'joliver1a@cnbc.com'", gender="'Female'", department="'Clothing'", start_date="'8/30/2013'", salary=42797, job_title="'Software Engineer III'", region_id=5)]

In [17]:
df_sample.describe().show()

+-------+------------------+-----------+--------------------+--------+------------+-----------+------------------+-------------------+------------------+
|summary|                id|  last_name|               email|  gender|  department| start_date|            salary|          job_title|         region_id|
+-------+------------------+-----------+--------------------+--------+------------+-----------+------------------+-------------------+------------------+
|  count|               103|        103|                 103|     103|         103|        103|               103|                103|               103|
|   mean| 473.3883495145631|       null|                null|    null|        null|       null|103965.12621359223|               null| 3.766990291262136|
| stddev|288.20340151922073|       null|                null|    null|        null|       null| 30953.85006484419|               null|1.9813042025499445|
|    min|                 3|'Alexander'|'aalexandero8@his...|'Female'|'Autom

In [18]:
df_sample.collect()

[Row(id=3, last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary=101768, job_title="'Recruiting Manager'", region_id=3),
 Row(id=10, last_name="'Sanchez'", email="'rsanchez9@cloudflare.com'", gender="'Male'", department="'Movies'", start_date="'3/13/2013'", salary=108093, job_title="'Sales Representative'", region_id=1),
 Row(id=17, last_name="'Morgan'", email="'dmorgang@123-reg.co.uk'", gender="'Female'", department="'Kids'", start_date="'5/4/2011'", salary=148952, job_title="'Programmer IV'", region_id=6),
 Row(id=27, last_name="'Owens'", email="'cowensq@shareasale.com'", gender="'Female'", department="'Home'", start_date="'5/26/2005'", salary=78698, job_title="'Executive Secretary'", region_id=2),
 Row(id=47, last_name="'Oliver'", email="'joliver1a@cnbc.com'", gender="'Female'", department="'Clothing'", start_date="'8/30/2013'", salary=42797, job_title="'Software Engineer III'", region_id=5),
 Row(id=58, last

In [19]:
df_sample.show()

+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
| id|   last_name|               email|  gender|   department|  start_date|salary|           job_title|region_id|
+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
|  3|      'Carr'|'fcarr2@woothemes...|  'Male'| 'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
| 10|   'Sanchez'|'rsanchez9@cloudf...|  'Male'|     'Movies'| '3/13/2013'|108093|'Sales Representa...|        1|
| 17|    'Morgan'|'dmorgang@123-reg...|'Female'|       'Kids'|  '5/4/2011'|148952|     'Programmer IV'|        6|
| 27|     'Owens'|'cowensq@shareasa...|'Female'|       'Home'| '5/26/2005'| 78698|'Executive Secret...|        2|
| 47|    'Oliver'|'joliver1a@cnbc.com'|'Female'|   'Clothing'| '8/30/2013'| 42797|'Software Enginee...|        5|
| 58|     'Baker'|'jbaker1l@usnews....|  'Male'|      'Games'|  '3/2/2007'| 68857|'Infor

In [20]:
type(df_sample)

### Preprocessing

#### Min-Max Scaling

In [21]:
from pyspark.ml import feature
from pyspark.ml.linalg import Vectors

In [22]:
df2 = spark.createDataFrame(data = [(1, Vectors.dense([10.0, 10000.0, 1.0])),
                                    (2, Vectors.dense([20.0, 20000.0, 2.0])),
                                    (3, Vectors.dense([30.0, 30000.0, 3.0]))],
                            schema = ["id", "features"])
df2

DataFrame[id: bigint, features: vector]

In [23]:
df2.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,20000.0,2.0]|
|  3|[30.0,30000.0,3.0]|
+---+------------------+



In [24]:
df2.schema

StructType(List(StructField(id,LongType,true),StructField(features,VectorUDT,true)))

In [25]:
df2.columns

['id', 'features']

In [26]:
df2.collect()

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0])),
 Row(id=2, features=DenseVector([20.0, 20000.0, 2.0])),
 Row(id=3, features=DenseVector([30.0, 30000.0, 3.0]))]

In [27]:
minmax_scaler = feature.MinMaxScaler(inputCol = "features",
                                     outputCol = "minmax_features")
minmax_scaler = minmax_scaler.fit(dataset = df2)
df2_scaled = minmax_scaler.transform(dataset = df2)
df2_scaled.show()

+---+------------------+---------------+
| id|          features|minmax_features|
+---+------------------+---------------+
|  1|[10.0,10000.0,1.0]|      (3,[],[])|
|  2|[20.0,20000.0,2.0]|  [0.5,0.5,0.5]|
|  3|[30.0,30000.0,3.0]|  [1.0,1.0,1.0]|
+---+------------------+---------------+



In [28]:
df2_scaled.limit(2).collect()

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), minmax_features=SparseVector(3, {})),
 Row(id=2, features=DenseVector([20.0, 20000.0, 2.0]), minmax_features=DenseVector([0.5, 0.5, 0.5]))]

In [29]:
df2_scaled.collect()

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), minmax_features=SparseVector(3, {})),
 Row(id=2, features=DenseVector([20.0, 20000.0, 2.0]), minmax_features=DenseVector([0.5, 0.5, 0.5])),
 Row(id=3, features=DenseVector([30.0, 30000.0, 3.0]), minmax_features=DenseVector([1.0, 1.0, 1.0]))]

In [30]:
df2_scaled.select("features", "minmax_features").show()

+------------------+---------------+
|          features|minmax_features|
+------------------+---------------+
|[10.0,10000.0,1.0]|      (3,[],[])|
|[20.0,20000.0,2.0]|  [0.5,0.5,0.5]|
|[30.0,30000.0,3.0]|  [1.0,1.0,1.0]|
+------------------+---------------+



#### Standardization

In [31]:
df2 = spark.createDataFrame(data = [(1, Vectors.dense([10.0, 10000.0, 1.0])),
                                    (2, Vectors.dense([20.0, 20000.0, 2.0])),
                                    (3, Vectors.dense([30.0, 30000.0, 3.0])),
                                    (3, Vectors.dense([30.0, 30000.0, 3.0])),
                                    (3, Vectors.dense([40.0, 40000.0, 4.0]))],
                            schema = ["id", "features"])
df2.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,20000.0,2.0]|
|  3|[30.0,30000.0,3.0]|
|  3|[30.0,30000.0,3.0]|
|  3|[40.0,40000.0,4.0]|
+---+------------------+



In [32]:
scaler = feature.StandardScaler(inputCol = "features",
                                outputCol = "s_features",
                                withMean = True,
                                withStd = True)
scaler = scaler.fit(dataset = df2)
df2_scaled = scaler.transform(dataset = df2)
df2_scaled.show()

+---+------------------+--------------------+
| id|          features|          s_features|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.4032928308912...|
|  2|[20.0,20000.0,2.0]|[-0.5262348115842...|
|  3|[30.0,30000.0,3.0]|[0.35082320772281...|
|  3|[30.0,30000.0,3.0]|[0.35082320772281...|
|  3|[40.0,40000.0,4.0]|[1.22788122702984...|
+---+------------------+--------------------+



In [33]:
df2_scaled.show(truncate = False)

+---+------------------+-------------------------------------------------------------+
|id |features          |s_features                                                   |
+---+------------------+-------------------------------------------------------------+
|1  |[10.0,10000.0,1.0]|[-1.403292830891247,-1.4032928308912467,-1.403292830891247]  |
|2  |[20.0,20000.0,2.0]|[-0.5262348115842176,-0.5262348115842175,-0.5262348115842177]|
|3  |[30.0,30000.0,3.0]|[0.3508232077228117,0.35082320772281167,0.35082320772281167] |
|3  |[30.0,30000.0,3.0]|[0.3508232077228117,0.35082320772281167,0.35082320772281167] |
|3  |[40.0,40000.0,4.0]|[1.2278812270298411,1.2278812270298407,1.227881227029841]    |
+---+------------------+-------------------------------------------------------------+



#### Bucketizer

In [34]:
df2 = spark.createDataFrame(data = [(1, 5000.0),
                                    (2, 15000.0),
                                    (3, 30000.0),
                                    (3, 35000.0),
                                    (3, 40000.0)],
                            schema = ["id", "points"])

In [35]:
splits = [-float("inf"), 0, 10000, 30000, float("inf")]

In [36]:
bucketizer = feature.Bucketizer(splits = splits,
                                inputCol = "points",
                                outputCol = "points_buckets")

In [37]:
df2_bucket = bucketizer.transform(dataset = df2)

In [38]:
df2_bucket.show()

+---+-------+--------------+
| id| points|points_buckets|
+---+-------+--------------+
|  1| 5000.0|           1.0|
|  2|15000.0|           2.0|
|  3|30000.0|           3.0|
|  3|35000.0|           3.0|
|  3|40000.0|           3.0|
+---+-------+--------------+

